Merge trunk into HA branch.

Several conflicts around introduction of protobuf translator for DatanodeProtocol - mostly trivial resolutions.

NB: this does not successfully pass any tests since the HAStatus field needs
to be integrated into the HeartbeatResponse Protobuf implementation.
That will be a separate commit for clearer history.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1214518 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-12-14 22:47:41 +00:00
parent 6c2da4bc0f
commit 8134b1c870
45 changed files with 2634 additions and 292 deletions

View File

@ -1,6 +1,9 @@
Hadoop HDFS Change Log
Trunk (unreleased changes)
INCOMPATIBLE CHANGES
HDFS-2676. Remove Avro RPC. (suresh)
NEW FEATURES
HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel
via hairong)
@ -32,10 +35,16 @@ Trunk (unreleased changes)
HDFS-2647. Used protobuf based RPC for InterDatanodeProtocol,
ClientDatanodeProtocol, JournalProtocol, NamenodeProtocol. (suresh)
HDFS-2663. Handle protobuf optional parameters correctly. (suresh)
HDFS-2666. Fix TestBackupNode failure. (suresh)
HDFS-234. Integration with BookKeeper logging system. (Ivan Kelly
via jitendra)
HDFS-2663. Optional protobuf parameters are not handled correctly.
(suresh)
HDFS-2661. Enable protobuf RPC for DatanodeProtocol. (jitendra)
IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple
@ -106,6 +115,8 @@ Trunk (unreleased changes)
HDFS-2650. Replace @inheritDoc with @Override. (Hari Mankude via suresh).
HDFS-2669 Enable protobuf rpc for ClientNamenodeProtocol
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)
@ -171,6 +182,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2594. Support getDelegationTokens and createSymlink in WebHDFS.
(szetszwo)
HDFS-2545. Change WebHDFS to support multiple namenodes in federation.
(szetszwo)
IMPROVEMENTS
HDFS-2560. Refactor BPOfferService to be a static inner class (todd)
@ -209,6 +223,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli)
HDFS-2675. Reduce warning verbosity when double-closing edit logs
(todd)
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)
@ -245,6 +262,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2653. DFSClient should cache whether addrs are non-local when
short-circuiting is enabled. (eli)
HDFS-2649. eclipse:eclipse build fails for hadoop-hdfs-httpfs.
(Jason Lowe via eli)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,60 @@
This module provides a BookKeeper backend for HFDS Namenode write
ahead logging.
BookKeeper is a highly available distributed write ahead logging
system. For more details, see
http://zookeeper.apache.org/bookkeeper
-------------------------------------------------------------------------------
How do I build?
To generate the distribution packages for BK journal, do the
following.
$ mvn clean install -Pdist -Dtar
This will generate a tarball,
target/hadoop-hdfs-bkjournal-<VERSION>.tar.gz
-------------------------------------------------------------------------------
How do I use the BookKeeper Journal?
To run a HDFS namenode using BookKeeper as a backend, extract the
distribution package on top of hdfs
cd hadoop-hdfs-<VERSION>/
tar --strip-components 1 -zxvf path/to/hadoop-hdfs-bkjournal-<VERSION>.tar.gz
Then, in hdfs-site.xml, set the following properties.
<property>
<name>dfs.namenode.edits.dir</name>
<value>bookkeeper://localhost:2181/bkjournal,file:///path/for/edits</value>
</property>
<property>
<name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
<value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
</property>
In this example, the namenode is configured to use 2 write ahead
logging devices. One writes to BookKeeper and the other to a local
file system. At the moment is is not possible to only write to
BookKeeper, as the resource checker explicitly checked for local
disks currently.
The given example, configures the namenode to look for the journal
metadata at the path /bkjournal on the a standalone zookeeper ensemble
at localhost:2181. To configure a multiple host zookeeper ensemble,
separate the hosts with semicolons. For example, if you have 3
zookeeper servers, zk1, zk2 & zk3, each listening on port 2181, you
would specify this with
bookkeeper://zk1:2181;zk2:2181;zk3:2181/bkjournal
The final part /bkjournal specifies the znode in zookeeper where
ledger metadata will be store. Administrators can set this to anything
they wish.

View File

@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project-dist</artifactId>
<version>0.24.0-SNAPSHOT</version>
<relativePath>../../../../hadoop-project-dist</relativePath>
</parent>
<groupId>org.apache.hadoop.contrib</groupId>
<artifactId>hadoop-hdfs-bkjournal</artifactId>
<version>0.24.0-SNAPSHOT</version>
<description>Apache Hadoop HDFS BookKeeper Journal</description>
<name>Apache Hadoop HDFS BookKeeper Journal</name>
<packaging>jar</packaging>
<properties>
<hadoop.component>hdfs</hadoop.component>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>0.24.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>0.24.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>0.24.0-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>4.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Input stream which reads from a BookKeeper ledger.
*/
class BookKeeperEditLogInputStream extends EditLogInputStream {
static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class);
private final long firstTxId;
private final long lastTxId;
private final int logVersion;
private final LedgerHandle lh;
private final FSEditLogOp.Reader reader;
private final FSEditLogLoader.PositionTrackingInputStream tracker;
/**
* Construct BookKeeper edit log input stream.
* Starts reading from the first entry of the ledger.
*/
BookKeeperEditLogInputStream(final LedgerHandle lh,
final EditLogLedgerMetadata metadata)
throws IOException {
this(lh, metadata, 0);
}
/**
* Construct BookKeeper edit log input stream.
* Starts reading from firstBookKeeperEntry. This allows the stream
* to take a shortcut during recovery, as it doesn't have to read
* every edit log transaction to find out what the last one is.
*/
BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata,
long firstBookKeeperEntry)
throws IOException {
this.lh = lh;
this.firstTxId = metadata.getFirstTxId();
this.lastTxId = metadata.getLastTxId();
this.logVersion = metadata.getVersion();
BufferedInputStream bin = new BufferedInputStream(
new LedgerInputStream(lh, firstBookKeeperEntry));
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, logVersion);
}
@Override
public long getFirstTxId() throws IOException {
return firstTxId;
}
@Override
public long getLastTxId() throws IOException {
return lastTxId;
}
@Override
public int getVersion() throws IOException {
return logVersion;
}
@Override
public FSEditLogOp readOp() throws IOException {
return reader.readOp();
}
@Override
public void close() throws IOException {
try {
lh.close();
} catch (Exception e) {
throw new IOException("Exception closing ledger", e);
}
}
@Override
public long getPosition() {
return tracker.getPos();
}
@Override
public long length() throws IOException {
return lh.getLength();
}
@Override
public String getName() {
return String.format("BookKeeper[%s,first=%d,last=%d]",
lh.toString(), firstTxId, lastTxId);
}
@Override
public JournalType getType() {
assert (false);
return null;
}
/**
* Input stream implementation which can be used by
* FSEditLogOp.Reader
*/
private static class LedgerInputStream extends InputStream {
private long readEntries;
private InputStream entryStream = null;
private final LedgerHandle lh;
private final long maxEntry;
/**
* Construct ledger input stream
* @param lh the ledger handle to read from
* @param firstBookKeeperEntry ledger entry to start reading from
*/
LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry)
throws IOException {
this.lh = lh;
readEntries = firstBookKeeperEntry;
try {
maxEntry = lh.getLastAddConfirmed();
} catch (Exception e) {
throw new IOException("Error reading last entry id", e);
}
}
/**
* Get input stream representing next entry in the
* ledger.
* @return input stream, or null if no more entries
*/
private InputStream nextStream() throws IOException {
try {
if (readEntries > maxEntry) {
return null;
}
Enumeration<LedgerEntry> entries
= lh.readEntries(readEntries, readEntries);
readEntries++;
if (entries.hasMoreElements()) {
LedgerEntry e = entries.nextElement();
assert !entries.hasMoreElements();
return e.getEntryInputStream();
}
} catch (Exception e) {
throw new IOException("Error reading entries from bookkeeper", e);
}
return null;
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
if (read(b, 0, 1) != 1) {
return -1;
} else {
return b[0];
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
int read = 0;
if (entryStream == null) {
entryStream = nextStream();
if (entryStream == null) {
return read;
}
}
while (read < len) {
int thisread = entryStream.read(b, off+read, (len-read));
if (thisread == -1) {
entryStream = nextStream();
if (entryStream == null) {
return read;
}
} else {
read += thisread;
}
}
return read;
} catch (IOException e) {
throw e;
}
}
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.Arrays;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.io.DataOutputBuffer;
import java.io.IOException;
/**
* Output stream for BookKeeper Journal.
* Multiple complete edit log entries are packed into a single bookkeeper
* entry before sending it over the network. The fact that the edit log entries
* are complete in the bookkeeper entries means that each bookkeeper log entry
*can be read as a complete edit log. This is useful for recover, as we don't
* need to read through the entire edit log segment to get the last written
* entry.
*/
class BookKeeperEditLogOutputStream
extends EditLogOutputStream implements AddCallback {
private final DataOutputBuffer bufCurrent;
private final AtomicInteger outstandingRequests;
private final int transmissionThreshold;
private final LedgerHandle lh;
private CountDownLatch syncLatch;
private final WriteLock wl;
private final Writer writer;
/**
* Construct an edit log output stream which writes to a ledger.
*/
protected BookKeeperEditLogOutputStream(Configuration conf,
LedgerHandle lh, WriteLock wl)
throws IOException {
super();
bufCurrent = new DataOutputBuffer();
outstandingRequests = new AtomicInteger(0);
syncLatch = null;
this.lh = lh;
this.wl = wl;
this.wl.acquire();
this.writer = new Writer(bufCurrent);
this.transmissionThreshold
= conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE_DEFAULT);
}
@Override
public void create() throws IOException {
// noop
}
@Override
public void close() throws IOException {
setReadyToFlush();
flushAndSync();
try {
lh.close();
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on close", ie);
} catch (BKException bke) {
throw new IOException("BookKeeper error during close", bke);
}
}
@Override
public void abort() throws IOException {
try {
lh.close();
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on close", ie);
} catch (BKException bke) {
throw new IOException("BookKeeper error during abort", bke);
}
wl.release();
}
@Override
public void writeRaw(final byte[] data, int off, int len) throws IOException {
throw new IOException("Not supported for BK");
}
@Override
public void write(FSEditLogOp op) throws IOException {
wl.checkWriteLock();
writer.writeOp(op);
if (bufCurrent.getLength() > transmissionThreshold) {
transmit();
}
}
@Override
public void setReadyToFlush() throws IOException {
wl.checkWriteLock();
transmit();
synchronized(this) {
syncLatch = new CountDownLatch(outstandingRequests.get());
}
}
@Override
public void flushAndSync() throws IOException {
wl.checkWriteLock();
assert(syncLatch != null);
try {
syncLatch.await();
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on latch", ie);
}
syncLatch = null;
// wait for whatever we wait on
}
/**
* Transmit the current buffer to bookkeeper.
* Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
* are never called at the same time.
*/
private void transmit() throws IOException {
wl.checkWriteLock();
if (bufCurrent.getLength() > 0) {
byte[] entry = Arrays.copyOf(bufCurrent.getData(),
bufCurrent.getLength());
lh.asyncAddEntry(entry, this, null);
bufCurrent.reset();
outstandingRequests.incrementAndGet();
}
}
@Override
public void addComplete(int rc, LedgerHandle handle,
long entryId, Object ctx) {
synchronized(this) {
outstandingRequests.decrementAndGet();
CountDownLatch l = syncLatch;
if (l != null) {
l.countDown();
}
}
}
}

View File

@ -0,0 +1,508 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.conf.Configuration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* BookKeeper Journal Manager
*
* To use, add the following to hdfs-site.xml.
* <pre>
* {@code
* <property>
* <name>dfs.namenode.edits.dir</name>
* <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
* </property>
*
* <property>
* <name>dfs.namenode.edits.journalPlugin.bookkeeper</name>
* <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
* </property>
* }
* </pre>
* The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
* [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
* pairs. In the example above there are 3 servers, in the ensemble,
* zk1, zk2 &amp; zk3, each one listening on port 2181.
*
* [root znode] is the path of the zookeeper znode, under which the editlog
* information will be stored.
*
* Other configuration options are:
* <ul>
* <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
* Number of bytes a bookkeeper journal stream will buffer before
* forcing a flush. Default is 1024.</li>
* <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
* Number of bookkeeper servers in edit log ledger ensembles. This
* is the number of bookkeeper servers which need to be available
* for the ledger to be writable. Default is 3.</li>
* <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
* Number of bookkeeper servers in the write quorum. This is the
* number of bookkeeper servers which must have acknowledged the
* write of an entry before it is considered written.
* Default is 2.</li>
* <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
* Password to use when creating ledgers. </li>
* </ul>
*/
public class BookKeeperJournalManager implements JournalManager {
static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
public static final String BKJM_OUTPUT_BUFFER_SIZE
= "dfs.namenode.bookkeeperjournal.output-buffer-size";
public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
= "dfs.namenode.bookkeeperjournal.ensemble-size";
public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
= "dfs.namenode.bookkeeperjournal.quorum-size";
public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
public static final String BKJM_BOOKKEEPER_DIGEST_PW
= "dfs.namenode.bookkeeperjournal.digestPw";
public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
private static final int BKJM_LAYOUT_VERSION = -1;
private final ZooKeeper zkc;
private final Configuration conf;
private final BookKeeper bkc;
private final WriteLock wl;
private final String ledgerPath;
private final MaxTxId maxTxId;
private final int ensembleSize;
private final int quorumSize;
private final String digestpw;
private final CountDownLatch zkConnectLatch;
private LedgerHandle currentLedger = null;
private int bytesToInt(byte[] b) {
assert b.length >= 4;
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
}
private byte[] intToBytes(int i) {
return new byte[] {
(byte)(i >> 24),
(byte)(i >> 16),
(byte)(i >> 8),
(byte)(i) };
}
/**
* Construct a Bookkeeper journal manager.
*/
public BookKeeperJournalManager(Configuration conf, URI uri)
throws IOException {
this.conf = conf;
String zkConnect = uri.getAuthority().replace(";", ",");
String zkPath = uri.getPath();
ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
ledgerPath = zkPath + "/ledgers";
String maxTxIdPath = zkPath + "/maxtxid";
String lockPath = zkPath + "/lock";
String versionPath = zkPath + "/version";
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
try {
zkConnectLatch = new CountDownLatch(1);
zkc = new ZooKeeper(zkConnect, 3000, new ZkConnectionWatcher());
if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
throw new IOException("Error connecting to zookeeper");
}
if (zkc.exists(zkPath, false) == null) {
zkc.create(zkPath, new byte[] {'0'},
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Stat versionStat = zkc.exists(versionPath, false);
if (versionStat != null) {
byte[] d = zkc.getData(versionPath, false, versionStat);
// There's only one version at the moment
assert bytesToInt(d) == BKJM_LAYOUT_VERSION;
} else {
zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if (zkc.exists(ledgerPath, false) == null) {
zkc.create(ledgerPath, new byte[] {'0'},
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
bkc = new BookKeeper(new ClientConfiguration(),
zkc);
} catch (Exception e) {
throw new IOException("Error initializing zk", e);
}
wl = new WriteLock(zkc, lockPath);
maxTxId = new MaxTxId(zkc, maxTxIdPath);
}
/**
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.
* Then create a ledger and stream based on that ledger.
* The ledger id is written to the inprogress znode, so that in the
* case of a crash, a recovery process can find the ledger we were writing
* to when we crashed.
* @param txId First transaction id to be written to the stream
*/
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
wl.acquire();
if (txId <= maxTxId.get()) {
throw new IOException("We've already seen " + txId
+ ". A new stream cannot be created with it");
}
if (currentLedger != null) {
throw new IOException("Already writing to a ledger, id="
+ currentLedger.getId());
}
try {
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
BookKeeper.DigestType.MAC,
digestpw.getBytes());
String znodePath = inprogressZNode();
EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId);
/* Write the ledger metadata out to the inprogress ledger znode
* This can fail if for some reason our write lock has
* expired (@see WriteLock) and another process has managed to
* create the inprogress znode.
* In this case, throw an exception. We don't want to continue
* as this would lead to a split brain situation.
*/
l.write(zkc, znodePath);
return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
} catch (Exception e) {
if (currentLedger != null) {
try {
currentLedger.close();
} catch (Exception e2) {
//log & ignore, an IOException will be thrown soon
LOG.error("Error closing ledger", e2);
}
}
throw new IOException("Error creating ledger", e);
}
}
/**
* Finalize a log segment. If the journal manager is currently
* writing to a ledger, ensure that this is the ledger of the log segment
* being finalized.
*
* Otherwise this is the recovery case. In the recovery case, ensure that
* the firstTxId of the ledger matches firstTxId for the segment we are
* trying to finalize.
*/
@Override
public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
String inprogressPath = inprogressZNode();
try {
Stat inprogressStat = zkc.exists(inprogressPath, false);
if (inprogressStat == null) {
throw new IOException("Inprogress znode " + inprogressPath
+ " doesn't exist");
}
wl.checkWriteLock();
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, inprogressPath);
if (currentLedger != null) { // normal, non-recovery case
if (l.getLedgerId() == currentLedger.getId()) {
try {
currentLedger.close();
} catch (BKException bke) {
LOG.error("Error closing current ledger", bke);
}
currentLedger = null;
} else {
throw new IOException(
"Active ledger has different ID to inprogress. "
+ l.getLedgerId() + " found, "
+ currentLedger.getId() + " expected");
}
}
if (l.getFirstTxId() != firstTxId) {
throw new IOException("Transaction id not as expected, "
+ l.getFirstTxId() + " found, " + firstTxId + " expected");
}
l.finalizeLedger(lastTxId);
String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
try {
l.write(zkc, finalisedPath);
} catch (KeeperException.NodeExistsException nee) {
if (!l.verify(zkc, finalisedPath)) {
throw new IOException("Node " + finalisedPath + " already exists"
+ " but data doesn't match");
}
}
maxTxId.store(lastTxId);
zkc.delete(inprogressPath, inprogressStat.getVersion());
} catch (KeeperException e) {
throw new IOException("Error finalising ledger", e);
} catch (InterruptedException ie) {
throw new IOException("Error finalising ledger", ie);
} finally {
wl.release();
}
}
@Override
public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
for (EditLogLedgerMetadata l : getLedgerList()) {
if (l.getFirstTxId() == fromTxnId) {
try {
LedgerHandle h = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC,
digestpw.getBytes());
return new BookKeeperEditLogInputStream(h, l);
} catch (Exception e) {
throw new IOException("Could not open ledger for " + fromTxnId, e);
}
}
}
throw new IOException("No ledger for fromTxnId " + fromTxnId + " found.");
}
@Override
public long getNumberOfTransactions(long fromTxnId) throws IOException {
long count = 0;
long expectedStart = 0;
for (EditLogLedgerMetadata l : getLedgerList()) {
if (l.isInProgress()) {
long endTxId = recoverLastTxId(l);
if (endTxId == HdfsConstants.INVALID_TXID) {
break;
}
count += (endTxId - l.getFirstTxId()) + 1;
break;
}
if (l.getFirstTxId() < fromTxnId) {
continue;
} else if (l.getFirstTxId() == fromTxnId) {
count = (l.getLastTxId() - l.getFirstTxId()) + 1;
expectedStart = l.getLastTxId() + 1;
} else {
if (expectedStart != l.getFirstTxId()) {
if (count == 0) {
throw new CorruptionException("StartTxId " + l.getFirstTxId()
+ " is not as expected " + expectedStart
+ ". Gap in transaction log?");
} else {
break;
}
}
count += (l.getLastTxId() - l.getFirstTxId()) + 1;
expectedStart = l.getLastTxId() + 1;
}
}
return count;
}
@Override
public void recoverUnfinalizedSegments() throws IOException {
wl.acquire();
synchronized (this) {
try {
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, inprogressZNode());
long endTxId = recoverLastTxId(l);
if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + inprogressZNode()
+ ". Unable to continue recovery.");
throw new IOException("Unrecoverable corruption, please check logs.");
}
finalizeLogSegment(l.getFirstTxId(), endTxId);
} catch (KeeperException.NoNodeException nne) {
// nothing to recover, ignore
} finally {
if (wl.haveLock()) {
wl.release();
}
}
}
}
@Override
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
for (EditLogLedgerMetadata l : getLedgerList()) {
if (!l.isInProgress()
&& l.getLastTxId() < minTxIdToKeep) {
try {
Stat stat = zkc.exists(l.getZkPath(), false);
zkc.delete(l.getZkPath(), stat.getVersion());
bkc.deleteLedger(l.getLedgerId());
} catch (InterruptedException ie) {
LOG.error("Interrupted while purging " + l, ie);
} catch (BKException bke) {
LOG.error("Couldn't delete ledger from bookkeeper", bke);
} catch (KeeperException ke) {
LOG.error("Error deleting ledger entry in zookeeper", ke);
}
}
}
}
@Override
public void close() throws IOException {
try {
bkc.close();
zkc.close();
} catch (Exception e) {
throw new IOException("Couldn't close zookeeper client", e);
}
}
/**
* Set the amount of memory that this stream should use to buffer edits.
* Setting this will only affect future output stream. Streams
* which have currently be created won't be affected.
*/
@Override
public void setOutputBufferCapacity(int size) {
conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
}
/**
* Find the id of the last edit log transaction writen to a edit log
* ledger.
*/
private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException {
try {
LedgerHandle lh = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC,
digestpw.getBytes());
long lastAddConfirmed = lh.getLastAddConfirmed();
BookKeeperEditLogInputStream in
= new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
long endTxId = HdfsConstants.INVALID_TXID;
FSEditLogOp op = in.readOp();
while (op != null) {
if (endTxId == HdfsConstants.INVALID_TXID
|| op.getTransactionId() == endTxId+1) {
endTxId = op.getTransactionId();
}
op = in.readOp();
}
return endTxId;
} catch (Exception e) {
throw new IOException("Exception retreiving last tx id for ledger " + l,
e);
}
}
/**
* Get a list of all segments in the journal.
*/
private List<EditLogLedgerMetadata> getLedgerList() throws IOException {
List<EditLogLedgerMetadata> ledgers
= new ArrayList<EditLogLedgerMetadata>();
try {
List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
for (String n : ledgerNames) {
ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
}
} catch (Exception e) {
throw new IOException("Exception reading ledger list from zk", e);
}
Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
return ledgers;
}
/**
* Get the znode path for a finalize ledger
*/
String finalizedLedgerZNode(long startTxId, long endTxId) {
return String.format("%s/edits_%018d_%018d",
ledgerPath, startTxId, endTxId);
}
/**
* Get the znode path for the inprogressZNode
*/
String inprogressZNode() {
return ledgerPath + "/inprogress";
}
/**
* Simple watcher to notify when zookeeper has connected
*/
private class ZkConnectionWatcher implements Watcher {
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected.equals(event.getState())) {
zkConnectLatch.countDown();
}
}
}
}

View File

@ -0,0 +1,200 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.io.IOException;
import java.util.Comparator;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Utility class for storing the metadata associated
* with a single edit log segment, stored in a single ledger
*/
public class EditLogLedgerMetadata {
static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class);
private String zkPath;
private final long ledgerId;
private final int version;
private final long firstTxId;
private long lastTxId;
private boolean inprogress;
public static final Comparator COMPARATOR
= new Comparator<EditLogLedgerMetadata>() {
public int compare(EditLogLedgerMetadata o1,
EditLogLedgerMetadata o2) {
if (o1.firstTxId < o2.firstTxId) {
return -1;
} else if (o1.firstTxId == o2.firstTxId) {
return 0;
} else {
return 1;
}
}
};
EditLogLedgerMetadata(String zkPath, int version,
long ledgerId, long firstTxId) {
this.zkPath = zkPath;
this.ledgerId = ledgerId;
this.version = version;
this.firstTxId = firstTxId;
this.lastTxId = HdfsConstants.INVALID_TXID;
this.inprogress = true;
}
EditLogLedgerMetadata(String zkPath, int version, long ledgerId,
long firstTxId, long lastTxId) {
this.zkPath = zkPath;
this.ledgerId = ledgerId;
this.version = version;
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.inprogress = false;
}
String getZkPath() {
return zkPath;
}
long getFirstTxId() {
return firstTxId;
}
long getLastTxId() {
return lastTxId;
}
long getLedgerId() {
return ledgerId;
}
int getVersion() {
return version;
}
boolean isInProgress() {
return this.inprogress;
}
void finalizeLedger(long newLastTxId) {
assert this.lastTxId == HdfsConstants.INVALID_TXID;
this.lastTxId = newLastTxId;
this.inprogress = false;
}
static EditLogLedgerMetadata read(ZooKeeper zkc, String path)
throws IOException, KeeperException.NoNodeException {
try {
byte[] data = zkc.getData(path, false, null);
String[] parts = new String(data).split(";");
if (parts.length == 3) {
int version = Integer.valueOf(parts[0]);
long ledgerId = Long.valueOf(parts[1]);
long txId = Long.valueOf(parts[2]);
return new EditLogLedgerMetadata(path, version, ledgerId, txId);
} else if (parts.length == 4) {
int version = Integer.valueOf(parts[0]);
long ledgerId = Long.valueOf(parts[1]);
long firstTxId = Long.valueOf(parts[2]);
long lastTxId = Long.valueOf(parts[3]);
return new EditLogLedgerMetadata(path, version, ledgerId,
firstTxId, lastTxId);
} else {
throw new IOException("Invalid ledger entry, "
+ new String(data));
}
} catch(KeeperException.NoNodeException nne) {
throw nne;
} catch(Exception e) {
throw new IOException("Error reading from zookeeper", e);
}
}
void write(ZooKeeper zkc, String path)
throws IOException, KeeperException.NodeExistsException {
this.zkPath = path;
String finalisedData;
if (inprogress) {
finalisedData = String.format("%d;%d;%d",
version, ledgerId, firstTxId);
} else {
finalisedData = String.format("%d;%d;%d;%d",
version, ledgerId, firstTxId, lastTxId);
}
try {
zkc.create(path, finalisedData.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
throw nee;
} catch (Exception e) {
throw new IOException("Error creating ledger znode");
}
}
boolean verify(ZooKeeper zkc, String path) {
try {
EditLogLedgerMetadata other = read(zkc, path);
if (LOG.isTraceEnabled()) {
LOG.trace("Verifying " + this.toString()
+ " against " + other);
}
return other == this;
} catch (Exception e) {
LOG.error("Couldn't verify data in " + path, e);
return false;
}
}
public boolean equals(Object o) {
if (!(o instanceof EditLogLedgerMetadata)) {
return false;
}
EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o;
return ledgerId == ol.ledgerId
&& firstTxId == ol.firstTxId
&& lastTxId == ol.lastTxId
&& version == ol.version;
}
public int hashCode() {
int hash = 1;
hash = hash * 31 + (int)ledgerId;
hash = hash * 31 + (int)firstTxId;
hash = hash * 31 + (int)lastTxId;
hash = hash * 31 + (int)version;
return hash;
}
public String toString() {
return "[LedgerId:"+ledgerId +
", firstTxId:" + firstTxId +
", lastTxId:" + lastTxId +
", version:" + version + "]";
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.io.IOException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Utility class for storing and reading
* the max seen txid in zookeeper
*/
class MaxTxId {
static final Log LOG = LogFactory.getLog(MaxTxId.class);
private final ZooKeeper zkc;
private final String path;
private Stat currentStat;
MaxTxId(ZooKeeper zkc, String path) {
this.zkc = zkc;
this.path = path;
}
synchronized void store(long maxTxId) throws IOException {
long currentMax = get();
if (currentMax < maxTxId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Setting maxTxId to " + maxTxId);
}
String txidStr = Long.toString(maxTxId);
try {
if (currentStat != null) {
currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"),
currentStat.getVersion());
} else {
zkc.create(path, txidStr.getBytes("UTF-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new IOException("Error writing max tx id", e);
}
}
}
synchronized long get() throws IOException {
try {
currentStat = zkc.exists(path, false);
if (currentStat == null) {
return 0;
} else {
byte[] bytes = zkc.getData(path, false, currentStat);
String txidString = new String(bytes, "UTF-8");
return Long.valueOf(txidString);
}
} catch (Exception e) {
throw new IOException("Error reading the max tx id from zk", e);
}
}
}

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.Collections;
import java.util.Comparator;
import java.net.InetAddress;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Distributed lock, using ZooKeeper.
*
* The lock is vulnerable to timing issues. For example, the process could
* encounter a really long GC cycle between acquiring the lock, and writing to
* a ledger. This could have timed out the lock, and another process could have
* acquired the lock and started writing to bookkeeper. Therefore other
* mechanisms are required to ensure correctness (i.e. Fencing).
*/
class WriteLock implements Watcher {
static final Log LOG = LogFactory.getLog(WriteLock.class);
private final ZooKeeper zkc;
private final String lockpath;
private AtomicInteger lockCount = new AtomicInteger(0);
private String myznode = null;
WriteLock(ZooKeeper zkc, String lockpath) throws IOException {
this.lockpath = lockpath;
this.zkc = zkc;
try {
if (zkc.exists(lockpath, false) == null) {
String localString = InetAddress.getLocalHost().toString();
zkc.create(lockpath, localString.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
void acquire() throws IOException {
while (true) {
if (lockCount.get() == 0) {
try {
synchronized(this) {
if (lockCount.get() > 0) {
lockCount.incrementAndGet();
return;
}
myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'},
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
if (LOG.isTraceEnabled()) {
LOG.trace("Acquiring lock, trying " + myznode);
}
List<String> nodes = zkc.getChildren(lockpath, false);
Collections.sort(nodes, new Comparator<String>() {
public int compare(String o1,
String o2) {
Integer l1 = Integer.valueOf(o1.replace("lock-", ""));
Integer l2 = Integer.valueOf(o2.replace("lock-", ""));
return l1 - l2;
}
});
if ((lockpath + "/" + nodes.get(0)).equals(myznode)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Lock acquired - " + myznode);
}
lockCount.set(1);
zkc.exists(myznode, this);
return;
} else {
LOG.error("Failed to acquire lock with " + myznode
+ ", " + nodes.get(0) + " already has it");
throw new IOException("Could not acquire lock");
}
}
} catch (KeeperException e) {
throw new IOException("Exception accessing Zookeeper", e);
} catch (InterruptedException ie) {
throw new IOException("Exception accessing Zookeeper", ie);
}
} else {
int ret = lockCount.getAndIncrement();
if (ret == 0) {
lockCount.decrementAndGet();
continue; // try again;
} else {
return;
}
}
}
}
void release() throws IOException {
try {
if (lockCount.decrementAndGet() <= 0) {
if (lockCount.get() < 0) {
LOG.warn("Unbalanced lock handling somewhere, lockCount down to "
+ lockCount.get());
}
synchronized(this) {
if (lockCount.get() <= 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("releasing lock " + myznode);
}
if (myznode != null) {
zkc.delete(myznode, -1);
myznode = null;
}
}
}
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
public void checkWriteLock() throws IOException {
if (!haveLock()) {
throw new IOException("Lost writer lock");
}
}
boolean haveLock() throws IOException {
return lockCount.get() > 0;
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Disconnected
|| event.getState() == KeeperState.Expired) {
LOG.warn("Lost zookeeper session, lost lock ");
lockCount.set(0);
} else {
// reapply the watch
synchronized (this) {
LOG.info("Zookeeper event " + event
+ " received, reapplying watch to " + myznode);
if (myznode != null) {
try {
zkc.exists(myznode, this);
} catch (Exception e) {
LOG.warn("Could not set watch on lock, releasing", e);
try {
release();
} catch (IOException ioe) {
LOG.error("Could not release Zk lock", ioe);
}
}
}
}
}
}
}

View File

@ -0,0 +1,395 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
import java.net.URI;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.LocalBookKeeper;
import java.io.RandomAccessFile;
import java.io.File;
import java.io.FilenameFilter;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import com.google.common.collect.ImmutableList;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class TestBookKeeperJournalManager {
static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
private static final long DEFAULT_SEGMENT_SIZE = 1000;
private static final String zkEnsemble = "localhost:2181";
private static Thread bkthread;
protected static Configuration conf = new Configuration();
private ZooKeeper zkc;
private static ZooKeeper connectZooKeeper(String ensemble)
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
if (!latch.await(3, TimeUnit.SECONDS)) {
throw new IOException("Zookeeper took too long to connect");
}
return zkc;
}
@BeforeClass
public static void setupBookkeeper() throws Exception {
final int numBookies = 5;
bkthread = new Thread() {
public void run() {
try {
String[] args = new String[1];
args[0] = String.valueOf(numBookies);
LOG.info("Starting bk");
LocalBookKeeper.main(args);
} catch (InterruptedException e) {
// go away quietly
} catch (Exception e) {
LOG.error("Error starting local bk", e);
}
}
};
bkthread.start();
if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
throw new Exception("Error starting zookeeper/bookkeeper");
}
ZooKeeper zkc = connectZooKeeper(zkEnsemble);
try {
boolean up = false;
for (int i = 0; i < 10; i++) {
try {
List<String> children = zkc.getChildren("/ledgers/available",
false);
if (children.size() == numBookies) {
up = true;
break;
}
} catch (KeeperException e) {
// ignore
}
Thread.sleep(1000);
}
if (!up) {
throw new IOException("Not enough bookies started");
}
} finally {
zkc.close();
}
}
@Before
public void setup() throws Exception {
zkc = connectZooKeeper(zkEnsemble);
}
@After
public void teardown() throws Exception {
zkc.close();
}
@AfterClass
public static void teardownBookkeeper() throws Exception {
if (bkthread != null) {
bkthread.interrupt();
bkthread.join();
}
}
@Test
public void testSimpleWrite() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
String zkpath = bkjm.finalizedLedgerZNode(1, 100);
assertNotNull(zkc.exists(zkpath, false));
assertNull(zkc.exists(bkjm.inprogressZNode(), false));
}
@Test
public void testNumberOfTransactions() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
long numTrans = bkjm.getNumberOfTransactions(1);
assertEquals(100, numTrans);
}
@Test
public void testNumberOfTransactionsWithGaps() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, txid-1);
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
}
zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
long numTrans = bkjm.getNumberOfTransactions(1);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
try {
numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1);
fail("Should have thrown corruption exception by this point");
} catch (JournalManager.CorruptionException ce) {
// if we get here, everything is going good
}
numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
}
@Test
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
}
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
out.abort();
out.close();
long numTrans = bkjm.getNumberOfTransactions(1);
assertEquals((txid-1), numTrans);
}
/**
* Create a bkjm namespace, write a journal from txid 1, close stream.
* Try to create a new journal from txid 1. Should throw an exception.
*/
@Test
public void testWriteRestartFrom1() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
long txid = 1;
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
txid = 1;
try {
out = bkjm.startLogSegment(txid);
fail("Shouldn't be able to start another journal from " + txid
+ " when one already exists");
} catch (Exception ioe) {
LOG.info("Caught exception as expected", ioe);
}
// test border case
txid = DEFAULT_SEGMENT_SIZE;
try {
out = bkjm.startLogSegment(txid);
fail("Shouldn't be able to start another journal from " + txid
+ " when one already exists");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
}
// open journal continuing from before
txid = DEFAULT_SEGMENT_SIZE + 1;
start = txid;
out = bkjm.startLogSegment(start);
assertNotNull(out);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
// open journal arbitarily far in the future
txid = DEFAULT_SEGMENT_SIZE * 4;
out = bkjm.startLogSegment(txid);
assertNotNull(out);
}
@Test
public void testTwoWriters() throws Exception {
long start = 1;
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try {
EditLogOutputStream out2 = bkjm2.startLogSegment(start);
fail("Shouldn't have been able to open the second writer");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
}
}
@Test
public void testSimpleRead() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
long txid = 1;
final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, numTransactions);
EditLogInputStream in = bkjm.getInputStream(1);
try {
assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in));
} finally {
in.close();
}
}
@Test
public void testSimpleRecovery() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
EditLogOutputStream out = bkjm.startLogSegment(1);
long txid = 1;
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.setReadyToFlush();
out.flush();
out.abort();
out.close();
assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
assertNotNull(zkc.exists(bkjm.inprogressZNode(), false));
bkjm.recoverUnfinalizedSegments();
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
assertNull(zkc.exists(bkjm.inprogressZNode(), false));
}
}

View File

@ -15,29 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.junit.Test;
/** Test for simple signs of life using Avro RPC. Not an exhaustive test
* yet, just enough to catch fundamental problems using Avro reflection to
* infer namenode RPC protocols. */
public class TestDfsOverAvroRpc extends TestLocalDFS {
@Test(timeout=20000)
public void testWorkingDirectory() throws IOException {
/*
Test turned off - see HDFS-2647 and HDFS-2660 for related comments.
This test can be turned on when Avro RPC is enabled using mechanism
similar to protobuf.
*/
/*
System.setProperty("hdfs.rpc.engine",
"org.apache.hadoop.ipc.AvroRpcEngine");
super.testWorkingDirectory();
/**
* Utilities for testing edit logs
*/
public class FSEditLogTestUtil {
public static FSEditLogOp getNoOpInstance() {
return FSEditLogOp.LogSegmentOp.getInstance(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
}
public static long countTransactionsInStream(EditLogInputStream in)
throws IOException {
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
return validation.getNumTransactions();
}
}

View File

@ -0,0 +1,62 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
#
# Bookkeeper Journal Logging Configuration
#
# Format is "<default threshold> (, <appender>)+
# DEFAULT: console appender only
log4j.rootLogger=OFF, CONSOLE
# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
#
# Add ROLLINGFILE to rootLogger to get log file output
# Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=DEBUG
log4j.appender.ROLLINGFILE.File=hdfs-namenode.log
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n

View File

@ -761,12 +761,12 @@ public class DFSUtil {
Configuration conf, UserGroupInformation ugi) throws IOException {
/**
* Currently we have simply burnt-in support for a SINGLE
* protocol - protocolR23Compatible. This will be replaced
* protocol - protocolPB. This will be replaced
* by a way to pick the right protocol based on the
* version of the target server.
*/
return new org.apache.hadoop.hdfs.protocolR23Compatible.
ClientNamenodeProtocolTranslatorR23(nameNodeAddr, conf, ugi);
return new org.apache.hadoop.hdfs.protocolPB.
ClientNamenodeProtocolTranslatorPB(nameNodeAddr, conf, ugi);
}
/** Create a {@link ClientDatanodeProtocol} proxy */

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.protocol;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.avro.reflect.Nullable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ContentSummary;
@ -117,7 +115,6 @@ public interface ClientProtocol extends VersionedProtocol {
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
@Nullable
@Idempotent
public LocatedBlocks getBlockLocations(String src,
long offset,
@ -317,7 +314,7 @@ public interface ClientProtocol extends VersionedProtocol {
* @throws IOException If an I/O error occurred
*/
public LocatedBlock addBlock(String src, String clientName,
@Nullable ExtendedBlock previous, @Nullable DatanodeInfo[] excludeNodes)
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException;
@ -706,7 +703,6 @@ public interface ClientProtocol extends VersionedProtocol {
* @return upgrade status information or null if no upgrades are in progress
* @throws IOException
*/
@Nullable
//TODO(HA): Should this be @Idempotent?
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
throws IOException;
@ -754,7 +750,6 @@ public interface ClientProtocol extends VersionedProtocol {
* @throws UnresolvedLinkException if the path contains a symlink.
* @throws IOException If an I/O error occurred
*/
@Nullable
@Idempotent
public HdfsFileStatus getFileInfo(String src) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, IOException;

View File

@ -36,8 +36,6 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.StringUtils;
import org.apache.avro.reflect.Nullable;
/**
* DatanodeInfo represents the status of a DataNode.
* This object is used for communication in the
@ -57,7 +55,6 @@ public class DatanodeInfo extends DatanodeID implements Node {
/** HostName as supplied by the datanode during registration as its
* name. Namenode uses datanode IP address as the name.
*/
@Nullable
protected String hostName = null;
// administrative states of a datanode
@ -84,10 +81,8 @@ public class DatanodeInfo extends DatanodeID implements Node {
}
}
@Nullable
protected AdminStates adminState;
public DatanodeInfo() {
super();
adminState = null;

View File

@ -31,8 +31,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.avro.reflect.Nullable;
/** Interface that represents the over the wire information for a file.
*/
@InterfaceAudience.Private
@ -47,7 +45,6 @@ public class HdfsFileStatus implements Writable {
}
private byte[] path; // local name of the inode that's encoded in java UTF8
@Nullable
private byte[] symlink; // symlink target encoded in java UTF8 or null
private long length;
private boolean isdir;

View File

@ -31,8 +31,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.avro.reflect.Nullable;
/**
* Collection of blocks with their locations and the file length.
*/
@ -42,7 +40,6 @@ public class LocatedBlocks implements Writable {
private long fileLength;
private List<LocatedBlock> blocks; // array of blocks with prioritized locations
private boolean underConstruction;
@Nullable
private LocatedBlock lastLocatedBlock = null;
private boolean isLastBlockComplete = false;

View File

@ -19,11 +19,16 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -124,9 +129,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.io.Text;
@ -218,9 +221,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
Builder builder = GetBlockLocationsResponseProto
.newBuilder();
if (b != null) {
builder.setLocations(
PBHelper.convert(server.getBlockLocations(req.getSrc(),
req.getOffset(), req.getLength()))).build();
builder.setLocations(PBHelper.convert(b)).build();
}
return builder.build();
} catch (IOException e) {
@ -233,14 +234,19 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, GetServerDefaultsRequestProto req)
throws ServiceException {
try {
FsServerDefaults result = server.getServerDefaults();
return GetServerDefaultsResponseProto.newBuilder()
.setServerDefaults(PBHelper.convert(server.getServerDefaults()))
.setServerDefaults(PBHelper.convert(result))
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
static final CreateResponseProto VOID_CREATE_RESPONSE =
CreateResponseProto.newBuilder().build();
@Override
public CreateResponseProto create(RpcController controller,
CreateRequestProto req) throws ServiceException {
@ -252,19 +258,22 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
} catch (IOException e) {
throw new ServiceException(e);
}
return CreateResponseProto.newBuilder().build();
return VOID_CREATE_RESPONSE;
}
static final AppendResponseProto NULL_APPEND_RESPONSE =
AppendResponseProto.newBuilder().build();
@Override
public AppendResponseProto append(RpcController controller,
AppendRequestProto req) throws ServiceException {
try {
return AppendResponseProto
.newBuilder()
.setBlock(
PBHelper.convert(server.append(req.getSrc(), req.getClientName())))
.build();
LocatedBlock result = server.append(req.getSrc(), req.getClientName());
if (result != null) {
return AppendResponseProto.newBuilder()
.setBlock(PBHelper.convert(result)).build();
}
return NULL_APPEND_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
}
@ -274,18 +283,16 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public SetReplicationResponseProto setReplication(RpcController controller,
SetReplicationRequestProto req) throws ServiceException {
try {
return SetReplicationResponseProto
.newBuilder()
.setResult(
server.setReplication(req.getSrc(), (short) req.getReplication()))
.build();
boolean result =
server.setReplication(req.getSrc(), (short) req.getReplication());
return SetReplicationResponseProto.newBuilder().setResult(result).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
static final SetPermissionResponseProto SET_PERM_RESPONSE =
static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE =
SetPermissionResponseProto.newBuilder().build();
@Override
@ -296,24 +303,26 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
} catch (IOException e) {
throw new ServiceException(e);
}
return SET_PERM_RESPONSE;
return VOID_SET_PERM_RESPONSE;
}
static final SetOwnerResponseProto SET_OWNER_RESPONSE =
static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE =
SetOwnerResponseProto.newBuilder().build();
@Override
public SetOwnerResponseProto setOwner(RpcController controller,
SetOwnerRequestProto req) throws ServiceException {
try {
server.setOwner(req.getSrc(), req.getUsername(), req.getGroupname());
server.setOwner(req.getSrc(),
req.hasUsername() ? req.getUsername() : null,
req.hasGroupname() ? req.getGroupname() : null);
} catch (IOException e) {
throw new ServiceException(e);
}
return SET_OWNER_RESPONSE;
return VOID_SET_OWNER_RESPONSE;
}
static final AbandonBlockResponseProto ABD_BLOCK_RESPONSE =
static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE =
AbandonBlockResponseProto.newBuilder().build();
@Override
@ -325,20 +334,22 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
} catch (IOException e) {
throw new ServiceException(e);
}
return ABD_BLOCK_RESPONSE;
return VOID_ADD_BLOCK_RESPONSE;
}
@Override
public AddBlockResponseProto addBlock(RpcController controller,
AddBlockRequestProto req) throws ServiceException {
try {
return AddBlockResponseProto.newBuilder().setBlock(
PBHelper.convert(
server.addBlock(req.getSrc(), req.getClientName(),
List<DatanodeInfoProto> excl = req.getExcludeNodesList();
LocatedBlock result = server.addBlock(req.getSrc(), req.getClientName(),
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
PBHelper.convert(
(DatanodeInfoProto[]) req.getExcludeNodesList().toArray()))))
.build();
(excl == null ||
excl.size() == 0) ? null :
PBHelper.convert(excl.toArray(new DatanodeInfoProto[excl.size()])));
return AddBlockResponseProto.newBuilder().setBlock(
PBHelper.convert(result)).build();
} catch (IOException e) {
throw new ServiceException(e);
}
@ -349,15 +360,17 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, GetAdditionalDatanodeRequestProto req)
throws ServiceException {
try {
List<DatanodeInfoProto> existingList = req.getExistingsList();
List<DatanodeInfoProto> excludesList = req.getExcludesList();
LocatedBlock result = server.getAdditionalDatanode(
req.getSrc(), PBHelper.convert(req.getBlk()),
PBHelper.convert(existingList.toArray(
new DatanodeInfoProto[existingList.size()])),
PBHelper.convert(excludesList.toArray(
new DatanodeInfoProto[excludesList.size()])),
req.getNumAdditionalNodes(), req.getClientName());
return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
PBHelper.convert(
server.getAdditionalDatanode(req.getSrc(),
PBHelper.convert(req.getBlk()),
PBHelper.convert((DatanodeInfoProto[]) req.getExistingsList()
.toArray()), PBHelper
.convert((DatanodeInfoProto[]) req.getExcludesList()
.toArray()), req.getNumAdditionalNodes(), req
.getClientName())))
PBHelper.convert(result))
.build();
} catch (IOException e) {
throw new ServiceException(e);
@ -368,10 +381,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public CompleteResponseProto complete(RpcController controller,
CompleteRequestProto req) throws ServiceException {
try {
return CompleteResponseProto.newBuilder().setResult(
boolean result =
server.complete(req.getSrc(), req.getClientName(),
PBHelper.convert(req.getLast())))
.build();
req.hasLast() ? PBHelper.convert(req.getLast()) : null);
return CompleteResponseProto.newBuilder().setResult(result).build();
} catch (IOException e) {
throw new ServiceException(e);
}
@ -384,8 +397,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
ReportBadBlocksRequestProto req) throws ServiceException {
try {
List<LocatedBlockProto> bl = req.getBlocksList();
server.reportBadBlocks(PBHelper.convertLocatedBlock(
(LocatedBlockProto[]) req.getBlocksList().toArray()));
bl.toArray(new LocatedBlockProto[bl.size()])));
} catch (IOException e) {
throw new ServiceException(e);
}
@ -399,7 +413,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public ConcatResponseProto concat(RpcController controller,
ConcatRequestProto req) throws ServiceException {
try {
server.concat(req.getTrg(), (String[])req.getSrcsList().toArray());
List<String> srcs = req.getSrcsList();
server.concat(req.getTrg(), srcs.toArray(new String[srcs.size()]));
} catch (IOException e) {
throw new ServiceException(e);
}
@ -456,14 +471,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
}
static final GetListingResponseProto NULL_GETLISTING_RESPONSE =
GetListingResponseProto.newBuilder().build();
@Override
public GetListingResponseProto getListing(RpcController controller,
GetListingRequestProto req) throws ServiceException {
try {
DirectoryListingProto result = PBHelper.convert(server.getListing(
DirectoryListing result = server.getListing(
req.getSrc(), req.getStartAfter().toByteArray(),
req.getNeedLocation()));
return GetListingResponseProto.newBuilder().setDirList(result).build();
req.getNeedLocation());
if (result !=null) {
return GetListingResponseProto.newBuilder().setDirList(
PBHelper.convert(result)).build();
} else {
return NULL_GETLISTING_RESPONSE;
}
} catch (IOException e) {
throw new ServiceException(e);
}
@ -494,6 +516,19 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
}
@Override
public RestoreFailedStorageResponseProto restoreFailedStorage(
RpcController controller, RestoreFailedStorageRequestProto req)
throws ServiceException {
try {
boolean result = server.restoreFailedStorage(req.getArg());
return RestoreFailedStorageResponseProto.newBuilder().setResult(result)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetFsStatsResponseProto getFsStats(RpcController controller,
GetFsStatusRequestProto req) throws ServiceException {
@ -557,19 +592,6 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
@Override
public RestoreFailedStorageResponseProto restoreFailedStorage(
RpcController controller, RestoreFailedStorageRequestProto req)
throws ServiceException {
try {
boolean result = server.restoreFailedStorage(req.getArg());
return RestoreFailedStorageResponseProto.newBuilder().setResult(result)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE =
RefreshNodesResponseProto.newBuilder().build();
@ -622,9 +644,10 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, ListCorruptFileBlocksRequestProto req)
throws ServiceException {
try {
CorruptFileBlocksProto result = PBHelper.convert(server
.listCorruptFileBlocks(req.getPath(), req.getCookie()));
return ListCorruptFileBlocksResponseProto.newBuilder().setCorrupt(result)
CorruptFileBlocks result = server.listCorruptFileBlocks(
req.getPath(), req.hasCookie() ? req.getCookie(): null);
return ListCorruptFileBlocksResponseProto.newBuilder()
.setCorrupt(PBHelper.convert(result))
.build();
} catch (IOException e) {
throw new ServiceException(e);
@ -646,29 +669,40 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
static final GetFileInfoResponseProto NULL_GETFILEINFO_RESPONSE =
GetFileInfoResponseProto.newBuilder().build();
@Override
public GetFileInfoResponseProto getFileInfo(RpcController controller,
GetFileInfoRequestProto req) throws ServiceException {
try {
HdfsFileStatus res = server.getFileInfo(req.getSrc());
GetFileInfoResponseProto.Builder builder =
GetFileInfoResponseProto.newBuilder();
if (res != null) {
builder.setFs(PBHelper.convert(res));
HdfsFileStatus result = server.getFileInfo(req.getSrc());
if (result != null) {
return GetFileInfoResponseProto.newBuilder().setFs(
PBHelper.convert(result)).build();
}
return builder.build();
return NULL_GETFILEINFO_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
}
}
static final GetFileLinkInfoResponseProto NULL_GETFILELINKINFO_RESPONSE =
GetFileLinkInfoResponseProto.newBuilder().build();
@Override
public GetFileLinkInfoResponseProto getFileLinkInfo(RpcController controller,
GetFileLinkInfoRequestProto req) throws ServiceException {
try {
HdfsFileStatusProto result =
PBHelper.convert(server.getFileLinkInfo(req.getSrc()));
return GetFileLinkInfoResponseProto.newBuilder().setFs(result).build();
HdfsFileStatus result = server.getFileLinkInfo(req.getSrc());
if (result != null) {
System.out.println("got non null result for getFileLinkInfo for " + req.getSrc());
return GetFileLinkInfoResponseProto.newBuilder().setFs(
PBHelper.convert(result)).build();
} else {
System.out.println("got null result for getFileLinkInfo for " + req.getSrc());
return NULL_GETFILELINKINFO_RESPONSE;
}
} catch (IOException e) {
throw new ServiceException(e);
}
@ -679,10 +713,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, GetContentSummaryRequestProto req)
throws ServiceException {
try {
ContentSummaryProto result =
PBHelper.convert(server.getContentSummary(req.getPath()));
return
GetContentSummaryResponseProto.newBuilder().setSummary(result).build();
ContentSummary result = server.getContentSummary(req.getPath());
return GetContentSummaryResponseProto.newBuilder()
.setSummary(PBHelper.convert(result)).build();
} catch (IOException e) {
throw new ServiceException(e);
}
@ -780,10 +813,11 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public UpdatePipelineResponseProto updatePipeline(RpcController controller,
UpdatePipelineRequestProto req) throws ServiceException {
try {
List<DatanodeIDProto> newNodes = req.getNewNodesList();
server
.updatePipeline(req.getClientName(), PBHelper.convert(req
.getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper
.convert((DatanodeIDProto[]) req.getNewNodesList().toArray()));
.convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])));
return VOID_UPDATEPIPELINE_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@ -95,9 +96,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDel
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
@ -121,6 +124,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSaf
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
@ -263,7 +268,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setClientName(clientName)
.build();
try {
return PBHelper.convert(rpcProxy.append(null, req).getBlock());
AppendResponseProto res = rpcProxy.append(null, req);
return res.hasBlock() ? PBHelper.convert(res.getBlock()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -304,13 +310,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
public void setOwner(String src, String username, String groupname)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
SetOwnerRequestProto req = SetOwnerRequestProto.newBuilder()
.setSrc(src)
.setUsername(username)
.setGroupname(groupname)
.build();
SetOwnerRequestProto.Builder req = SetOwnerRequestProto.newBuilder()
.setSrc(src);
if (username != null)
req.setUsername(username);
if (groupname != null)
req.setGroupname(groupname);
try {
rpcProxy.setOwner(null, req);
rpcProxy.setOwner(null, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -335,15 +342,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException {
AddBlockRequestProto.Builder builder = AddBlockRequestProto.newBuilder();
builder.setSrc(src)
.setClientName(clientName)
.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
if (previous != null) {
builder.setPrevious(PBHelper.convert(previous));
}
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder().setSrc(src)
.setClientName(clientName);
if (previous != null)
req.setPrevious(PBHelper.convert(previous));
if (excludeNodes != null)
req.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
try {
return PBHelper.convert(rpcProxy.addBlock(null, builder.build()).getBlock());
return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -376,13 +382,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
public boolean complete(String src, String clientName, ExtendedBlock last)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
CompleteRequestProto req = CompleteRequestProto.newBuilder()
CompleteRequestProto.Builder req = CompleteRequestProto.newBuilder()
.setSrc(src)
.setClientName(clientName)
.setLast(PBHelper.convert(last))
.build();
.setClientName(clientName);
if (last != null)
req.setLast(PBHelper.convert(last));
try {
return rpcProxy.complete(null, req).getResult();
return rpcProxy.complete(null, req.build()).getResult();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -493,7 +499,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setStartAfter(ByteString.copyFrom(startAfter))
.setNeedLocation(needLocation).build();
try {
return PBHelper.convert(rpcProxy.getListing(null, req).getDirList());
GetListingResponseProto result = rpcProxy.getListing(null, req);
if (result.hasDirList()) {
return PBHelper.convert(result.getDirList());
}
return null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -635,11 +646,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
throws IOException {
ListCorruptFileBlocksRequestProto req = ListCorruptFileBlocksRequestProto
.newBuilder().setPath(path).setCookie(cookie).build();
ListCorruptFileBlocksRequestProto.Builder req =
ListCorruptFileBlocksRequestProto.newBuilder().setPath(path);
if (cookie != null)
req.setCookie(cookie);
try {
return PBHelper.convert(
rpcProxy.listCorruptFileBlocks(null, req).getCorrupt());
rpcProxy.listCorruptFileBlocks(null, req.build()).getCorrupt());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -676,7 +689,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder()
.setSrc(src).build();
try {
return PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs());
GetFileLinkInfoResponseProto result = rpcProxy.getFileLinkInfo(null, req);
return result.hasFs() ?
PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -170,7 +170,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
throws IOException {
HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
.setCapacity(dfsUsed).setRemaining(remaining)
.setDfsUsed(dfsUsed).setRemaining(remaining)
.setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
.setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
HeartbeatResponseProto resp;
@ -196,7 +196,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
.setBlockPoolId(poolId);
if (blocks != null) {
for (int i = 0; i < blocks.length; i++) {
builder.setBlocks(i, blocks[i]);
builder.addBlocks(blocks[i]);
}
}
BlockReportRequestProto req = builder.build();
@ -219,7 +219,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
.setBlockPoolId(poolId);
if (receivedAndDeletedBlocks != null) {
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
builder.setBlocks(i, PBHelper.convert(receivedAndDeletedBlocks[i]));
builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
}
}
BlockReceivedAndDeletedRequestProto req = builder.build();
@ -292,7 +292,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements DatanodeProtocol,
.setNewLength(newlength).setCloseFile(closeFile)
.setDeleteBlock(deleteblock);
for (int i = 0; i < newtargets.length; i++) {
builder.setNewTaragets(i, PBHelper.convert(newtargets[i]));
builder.addNewTaragets(PBHelper.convert(newtargets[i]));
}
CommitBlockSynchronizationRequestProto req = builder.build();
try {

View File

@ -122,7 +122,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
@Override
public BlockReportResponseProto blockReport(RpcController controller,
BlockReportRequestProto request) throws ServiceException {
DatanodeCommand cmd;
DatanodeCommand cmd = null;
List<Long> blockIds = request.getBlocksList();
long[] blocks = new long[blockIds.size()];
for (int i = 0; i < blockIds.size(); i++) {

View File

@ -667,6 +667,9 @@ public class PBHelper {
case DatanodeProtocol.DNA_INVALIDATE:
builder.setAction(BlockCommandProto.Action.INVALIDATE);
break;
case DatanodeProtocol.DNA_SHUTDOWN:
builder.setAction(BlockCommandProto.Action.SHUTDOWN);
break;
}
Block[] blocks = cmd.getBlocks();
for (int i = 0; i < blocks.length; i++) {
@ -687,6 +690,10 @@ public class PBHelper {
public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
if (datanodeCommand == null) {
return builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand)
.build();
}
switch (datanodeCommand.getAction()) {
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
builder.setCmdType(DatanodeCommandProto.Type.BalancerBandwidthCommand)
@ -713,11 +720,18 @@ public class PBHelper {
break;
case DatanodeProtocol.DNA_TRANSFER:
case DatanodeProtocol.DNA_INVALIDATE:
case DatanodeProtocol.DNA_SHUTDOWN:
builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
PBHelper.convert((BlockCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_SHUTDOWN: //Not expected
case DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS:
case DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE:
builder.setCmdType(DatanodeCommandProto.Type.UpgradeCommand)
.setUpgradeCmd(PBHelper.convert((UpgradeCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
}
return builder.build();
}
@ -756,13 +770,15 @@ public class PBHelper {
public static BlockCommand convert(BlockCommandProto blkCmd) {
List<BlockProto> blockProtoList = blkCmd.getBlocksList();
List<DatanodeInfosProto> targetList = blkCmd.getTargetsList();
DatanodeInfo[][] targets = new DatanodeInfo[blockProtoList.size()][];
Block[] blocks = new Block[blockProtoList.size()];
for (int i = 0; i < blockProtoList.size(); i++) {
targets[i] = PBHelper.convert(targetList.get(i));
blocks[i] = PBHelper.convert(blockProtoList.get(i));
}
List<DatanodeInfosProto> targetList = blkCmd.getTargetsList();
DatanodeInfo[][] targets = new DatanodeInfo[targetList.size()][];
for (int i = 0; i < targetList.size(); i++) {
targets[i] = PBHelper.convert(targetList.get(i));
}
int action = DatanodeProtocol.DNA_UNKNOWN;
switch (blkCmd.getAction()) {
case TRANSFER:
@ -771,6 +787,9 @@ public class PBHelper {
case INVALIDATE:
action = DatanodeProtocol.DNA_INVALIDATE;
break;
case SHUTDOWN:
action = DatanodeProtocol.DNA_SHUTDOWN;
break;
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
}
@ -802,9 +821,13 @@ public class PBHelper {
}
public static UpgradeCommandProto convert(UpgradeCommand comm) {
UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder()
.setVersion(comm.getVersion())
.setUpgradeStatus(comm.getCurrentStatus());
UpgradeCommandProto.Builder builder = UpgradeCommandProto.newBuilder();
if (comm == null) {
return builder.setAction(UpgradeCommandProto.Action.UNKNOWN)
.setVersion(0).setUpgradeStatus(0).build();
}
builder.setVersion(comm.getVersion()).setUpgradeStatus(
comm.getCurrentStatus());
switch (comm.getAction()) {
case UpgradeCommand.UC_ACTION_REPORT_STATUS:
builder.setAction(UpgradeCommandProto.Action.REPORT_STATUS);
@ -953,6 +976,13 @@ public class PBHelper {
if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
result.add(CreateFlag.APPEND);
}
if ((flag & CreateFlagProto.CREATE_VALUE) == CreateFlagProto.CREATE_VALUE) {
result.add(CreateFlag.CREATE);
}
if ((flag & CreateFlagProto.OVERWRITE_VALUE)
== CreateFlagProto.OVERWRITE_VALUE) {
result.add(CreateFlag.OVERWRITE);
}
return new EnumSetWritable<CreateFlag>(result);
}
@ -984,7 +1014,7 @@ public class PBHelper {
public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
if (fs == null)
return null;
FileType fType = FileType.IS_DIR;;
FileType fType = FileType.IS_FILE;
if (fs.isDir()) {
fType = FileType.IS_DIR;
} else if (fs.isSymlink()) {
@ -1003,8 +1033,7 @@ public class PBHelper {
setOwner(fs.getOwner()).
setGroup(fs.getGroup()).
setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
if (fs.getSymlink() != null) {
if (fs.isSymlink()) {
builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
}
if (fs instanceof HdfsLocatedFileStatus) {
@ -1031,7 +1060,7 @@ public class PBHelper {
final int len = fs.length;
HdfsFileStatus[] result = new HdfsFileStatus[len];
for (int i = 0; i < len; ++i) {
PBHelper.convert(fs[i]);
result[i] = PBHelper.convert(fs[i]);
}
return result;
}
@ -1039,9 +1068,11 @@ public class PBHelper {
public static DirectoryListing convert(DirectoryListingProto dl) {
if (dl == null)
return null;
List<HdfsFileStatusProto> partList = dl.getPartialListingList();
return new DirectoryListing(
PBHelper.convert((HdfsFileStatusProto[])
dl.getPartialListingList().toArray()),
partList.isEmpty() ? new HdfsFileStatus[0]
: PBHelper.convert(
partList.toArray(new HdfsFileStatusProto[partList.size()])),
dl.getRemainingEntries());
}

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.protocolR23Compatible;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.avro.reflect.Nullable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CreateFlag;
@ -97,7 +95,6 @@ public interface ClientNamenodeWireProtocol extends VersionedProtocol {
* The specification of this method matches that of
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getBlockLocations}
*/
@Nullable
public LocatedBlocksWritable getBlockLocations(String src,
long offset,
long length)
@ -175,7 +172,7 @@ public interface ClientNamenodeWireProtocol extends VersionedProtocol {
* org.apache.hadoop.hdfs.protocol.DatanodeInfo[])}
*/
public LocatedBlockWritable addBlock(String src, String clientName,
@Nullable ExtendedBlockWritable previous, @Nullable DatanodeInfoWritable[] excludeNodes)
ExtendedBlockWritable previous, DatanodeInfoWritable[] excludeNodes)
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException;
@ -344,7 +341,6 @@ public interface ClientNamenodeWireProtocol extends VersionedProtocol {
* The specification of this method matches that of
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#distributedUpgradeProgress}
*/
@Nullable
public UpgradeStatusReportWritable distributedUpgradeProgress(
UpgradeAction action)
throws IOException;
@ -373,7 +369,6 @@ public interface ClientNamenodeWireProtocol extends VersionedProtocol {
* The specification of this method matches that of
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#getFileInfo(String)}
*/
@Nullable
public HdfsFileStatusWritable getFileInfo(String src)
throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, IOException;

View File

@ -34,8 +34,6 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.avro.reflect.Nullable;
/**
* DatanodeInfo represents the status of a DataNode.
* This object is used for communication in the
@ -55,7 +53,6 @@ public class DatanodeInfoWritable extends DatanodeIDWritable {
/** HostName as supplied by the datanode during registration as its
* name. Namenode uses datanode IP address as the name.
*/
@Nullable
protected String hostName = null;
// administrative states of a datanode
@ -82,7 +79,6 @@ public class DatanodeInfoWritable extends DatanodeIDWritable {
}
}
@Nullable
protected AdminStates adminState;
static public DatanodeInfo convertDatanodeInfo(DatanodeInfoWritable di) {

View File

@ -30,8 +30,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.avro.reflect.Nullable;
/** Interface that represents the over the wire information for a file.
*/
@InterfaceAudience.Private
@ -46,7 +44,6 @@ public class HdfsFileStatusWritable implements Writable {
}
private byte[] path; // local name of the inode that's encoded in java UTF8
@Nullable
private byte[] symlink; // symlink target encoded in java UTF8 or null
private long length;
private boolean isdir;

View File

@ -29,8 +29,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.avro.reflect.Nullable;
/**
* Collection of blocks with their locations and the file length.
*/
@ -40,7 +38,6 @@ public class LocatedBlocksWritable implements Writable {
private long fileLength;
private List<LocatedBlockWritable> blocks; // array of blocks with prioritized locations
private boolean underConstruction;
@Nullable
private LocatedBlockWritable lastLocatedBlock = null;
private boolean isLastBlockComplete = false;

View File

@ -29,6 +29,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@ -405,7 +412,7 @@ class BPOfferService {
* @return a proxy to the active NN
*/
@Deprecated
synchronized DatanodeProtocol getActiveNN() {
synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
if (bpServiceToActive != null) {
return bpServiceToActive.bpNamenode;
} else {
@ -622,10 +629,10 @@ class BPOfferService {
* Connect to the NN at the given address. This is separated out for ease
* of testing.
*/
DatanodeProtocol connectToNN(InetSocketAddress nnAddr)
DatanodeProtocolClientSideTranslatorPB connectToNN(InetSocketAddress nnAddr)
throws IOException {
return (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID, nnAddr, dn.getConf());
return new DatanodeProtocolClientSideTranslatorPB(nnAddr,
dn.getConf());
}
}

View File

@ -35,11 +35,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@ -74,7 +74,7 @@ class BPServiceActor implements Runnable {
boolean resetBlockReportTime = true;
Thread bpThread;
DatanodeProtocol bpNamenode;
DatanodeProtocolClientSideTranslatorPB bpNamenode;
private long lastHeartbeat = 0;
private volatile boolean initialized = false;
private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
@ -119,7 +119,7 @@ class BPServiceActor implements Runnable {
* Used to inject a spy NN in the unit tests.
*/
@VisibleForTesting
void setNameNode(DatanodeProtocol dnProtocol) {
void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
bpNamenode = dnProtocol;
}

View File

@ -111,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
@ -1878,7 +1879,8 @@ public class DataNode extends Configured
* @return Namenode corresponding to the bpid
* @throws IOException
*/
public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
throws IOException {
BPOfferService bpos = blockPoolManager.get(bpid);
if (bpos == null) {
throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
@ -1890,7 +1892,8 @@ public class DataNode extends Configured
void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock();
DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
.getBlockPoolId());
long recoveryId = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) {

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.Param;
@ -89,7 +90,8 @@ public class DatanodeWebHdfsMethods {
private @Context ServletContext context;
private @Context HttpServletResponse response;
private void init(final UserGroupInformation ugi, final DelegationParam delegation,
private void init(final UserGroupInformation ugi,
final DelegationParam delegation, final InetSocketAddress nnRpcAddr,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) throws IOException {
if (LOG.isTraceEnabled()) {
@ -102,9 +104,8 @@ public class DatanodeWebHdfsMethods {
if (UserGroupInformation.isSecurityEnabled()) {
//add a token for RPC.
final DataNode datanode = (DataNode)context.getAttribute("datanode");
final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf());
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
final Token<DelegationTokenIdentifier> token =
new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegation.getValue());
SecurityUtil.setTokenService(token, nnRpcAddr);
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
@ -122,6 +123,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@ -135,8 +139,8 @@ public class DatanodeWebHdfsMethods {
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize
) throws IOException, InterruptedException {
return put(in, ugi, delegation, ROOT, op, permission, overwrite, bufferSize,
replication, blockSize);
return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission,
overwrite, bufferSize, replication, blockSize);
}
/** Handle HTTP PUT request. */
@ -149,6 +153,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@ -164,8 +171,9 @@ public class DatanodeWebHdfsMethods {
final BlockSizeParam blockSize
) throws IOException, InterruptedException {
init(ugi, delegation, path, op, permission, overwrite, bufferSize,
replication, blockSize);
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
init(ugi, delegation, nnRpcAddr, path, op, permission,
overwrite, bufferSize, replication, blockSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -178,7 +186,6 @@ public class DatanodeWebHdfsMethods {
case CREATE:
{
final Configuration conf = new Configuration(datanode.getConf());
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
conf.set(FsPermission.UMASK_LABEL, "000");
final int b = bufferSize.getValue(conf);
@ -221,12 +228,15 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
return post(in, ugi, delegation, ROOT, op, bufferSize);
return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize);
}
/** Handle HTTP POST request. */
@ -239,6 +249,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@ -246,7 +259,8 @@ public class DatanodeWebHdfsMethods {
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
init(ugi, delegation, path, op, bufferSize);
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
init(ugi, delegation, nnRpcAddr, path, op, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -259,7 +273,6 @@ public class DatanodeWebHdfsMethods {
case APPEND:
{
final Configuration conf = new Configuration(datanode.getConf());
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
final int b = bufferSize.getValue(conf);
DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
FSDataOutputStream out = null;
@ -291,6 +304,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@ -300,7 +316,8 @@ public class DatanodeWebHdfsMethods {
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length,
bufferSize);
}
/** Handle HTTP GET request. */
@ -311,6 +328,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@ -322,7 +342,8 @@ public class DatanodeWebHdfsMethods {
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
init(ugi, delegation, path, op, offset, length, bufferSize);
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue();
init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -331,7 +352,6 @@ public class DatanodeWebHdfsMethods {
final String fullpath = path.getAbsolutePath();
final DataNode datanode = (DataNode)context.getAttribute("datanode");
final Configuration conf = new Configuration(datanode.getConf());
final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
switch(op.getValue()) {
case OPEN:

View File

@ -294,7 +294,7 @@ public class FSEditLog {
*/
synchronized void close() {
if (state == State.CLOSED) {
LOG.warn("Closing log when already closed", new Exception());
LOG.debug("Closing log when already closed");
return;
}
if (state == State.IN_SEGMENT) {

View File

@ -62,11 +62,15 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@ -92,6 +96,7 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
@ -144,13 +149,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT);
InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
ClientNamenodeProtocolServerSideTranslatorR23
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorR23(this);
new ClientNamenodeProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
new DatanodeProtocolServerSideTranslatorPB(this);
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService service = NamenodeProtocolService
BlockingService NNPbService = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
@ -160,13 +174,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
// Add all the RPC protocols that the namenode implements
this.serviceRpcServer =
RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.
ClientNamenodeWireProtocol.class, clientProtocolServerTranslator,
RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
ClientNamenodeProtocolPB.class, clientNNPbService,
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
DatanodeProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
RefreshAuthorizationPolicyProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
@ -175,7 +187,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
GetUserMappingsProtocol.class, this);
this.serviceRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
HAServiceProtocol.class, this);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
serviceRpcServer);
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
@ -186,13 +200,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
// Add all the RPC protocols that the namenode implements
this.clientRpcServer = RPC.getServer(
org.apache.hadoop.hdfs.protocolR23Compatible.
ClientNamenodeWireProtocol.class,
clientProtocolServerTranslator, socAddr.getHostName(),
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
clientNNPbService, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf,
namesystem.getDelegationTokenSecretManager());
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
DatanodeProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
RefreshAuthorizationPolicyProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
@ -201,7 +212,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
GetUserMappingsProtocol.class, this);
this.clientRpcServer.addProtocol(RpcKind.RPC_WRITABLE,
HAServiceProtocol.class, this);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, service,
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
clientRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
clientRpcServer);
// set service-level authorization security policy
@ -261,7 +274,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
long clientVersion) throws IOException {
if (protocol.equals(ClientProtocol.class.getName())) {
throw new IOException("Old Namenode Client protocol is not supported:" +
protocol + "Switch your clientside to " + ClientNamenodeWireProtocol.class);
protocol + "Switch your clientside to " + ClientNamenodeProtocol.class);
} else if (protocol.equals(DatanodeProtocol.class.getName())){
return DatanodeProtocol.versionID;
} else if (protocol.equals(NamenodeProtocol.class.getName())){

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
@ -198,6 +199,7 @@ public class NamenodeWebHdfsMethods {
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
}
final String query = op.toQueryString() + delegationQuery
+ "&" + new NamenodeRpcAddressParam(namenode)
+ Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.avro.reflect.Union;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -25,13 +24,6 @@ import org.apache.hadoop.classification.InterfaceStability;
* Base class for data-node command.
* Issued by the name-node to notify data-nodes what should be done.
*/
// Declare subclasses for Avro's denormalized representation
@Union({Void.class,
RegisterCommand.class, FinalizeCommand.class,
BlockCommand.class, UpgradeCommand.class,
BlockRecoveryCommand.class, KeyUpdateCommand.class})
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class DatanodeCommand extends ServerCommand {

View File

@ -30,8 +30,6 @@ import org.apache.hadoop.hdfs.server.protocolR23Compatible.DatanodeWireProtocol;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.avro.reflect.Nullable;
/**********************************************************************
* Protocol that a DFS datanode uses to communicate with the NameNode.
* It's used to upload current load information and block reports.
@ -76,6 +74,8 @@ public interface DatanodeProtocol extends VersionedProtocol {
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
final static int DNA_UC_ACTION_REPORT_STATUS = 100; // Report upgrade status
final static int DNA_UC_ACTION_START_UPGRADE = 101; // start upgrade
/**
* Register Datanode.
@ -105,7 +105,6 @@ public interface DatanodeProtocol extends VersionedProtocol {
* @param failedVolumes number of failed volumes
* @throws IOException on error
*/
@Nullable
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
long capacity,
long dfsUsed, long remaining,

View File

@ -41,8 +41,10 @@ import org.apache.hadoop.io.WritableFactory;
@InterfaceStability.Evolving
public class UpgradeCommand extends DatanodeCommand {
public final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
public final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status
public final static int UC_ACTION_START_UPGRADE = 101; // start upgrade
public final static int UC_ACTION_REPORT_STATUS =
DatanodeProtocol.DNA_UC_ACTION_REPORT_STATUS;
public final static int UC_ACTION_START_UPGRADE =
DatanodeProtocol.DNA_UC_ACTION_START_UPGRADE;
private int version;
private short upgradeStatus;

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.protocolR23Compatible;
import java.io.IOException;
import org.apache.avro.reflect.Nullable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -98,7 +97,6 @@ public interface DatanodeWireProtocol extends VersionedProtocol {
* @param failedVolumes number of failed volumes
* @throws IOException on error
*/
@Nullable
public HeartbeatResponseWritable sendHeartbeat(
DatanodeRegistrationWritable registration, long capacity, long dfsUsed,
long remaining, long blockPoolUsed, int xmitsInProgress,

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
import java.net.InetSocketAddress;
/** InetSocketAddressParam parameter. */
abstract class InetSocketAddressParam
extends Param<InetSocketAddress, InetSocketAddressParam.Domain> {
InetSocketAddressParam(final Domain domain, final InetSocketAddress value) {
super(domain, value);
}
@Override
public String toString() {
return getName() + "=" + Domain.toString(getValue());
}
/** The domain of the parameter. */
static final class Domain extends Param.Domain<InetSocketAddress> {
Domain(final String paramName) {
super(paramName);
}
@Override
public String getDomain() {
return "<HOST:PORT>";
}
@Override
InetSocketAddress parse(final String str) {
final int i = str.indexOf(':');
if (i < 0) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": the ':' character not found.");
} else if (i == 0) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": HOST is empty.");
} else if (i == str.length() - 1) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": PORT is empty.");
}
final String host = str.substring(0, i);
final int port;
try {
port = Integer.parseInt(str.substring(i + 1));
} catch(NumberFormatException e) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": the ':' position is " + i
+ " but failed to parse PORT.", e);
}
try {
return new InetSocketAddress(host, port);
} catch(Exception e) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\": cannot create InetSocketAddress(host=" + host
+ ", port=" + port + ")", e);
}
}
/** Convert an InetSocketAddress to a HOST:PORT String. */
static String toString(final InetSocketAddress addr) {
return addr.getHostName() + ":" + addr.getPort();
}
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
/** Namenode RPC address parameter. */
public class NamenodeRpcAddressParam extends InetSocketAddressParam {
/** Parameter name. */
public static final String NAME = "namenoderpcaddress";
/** Default parameter value. */
public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME);
/**
* Constructor.
* @param str a string representation of the parameter value.
*/
public NamenodeRpcAddressParam(final String str) {
super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str));
}
/**
* Construct an object using the RPC address of the given namenode.
*/
public NamenodeRpcAddressParam(final NameNode namenode) {
super(DOMAIN, namenode.getNameNodeAddress());
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -74,7 +74,7 @@ message AppendRequestProto {
}
message AppendResponseProto {
required LocatedBlockProto block = 1;
optional LocatedBlockProto block = 1;
}
message SetReplicationRequestProto {
@ -96,8 +96,8 @@ message SetPermissionResponseProto { // void response
message SetOwnerRequestProto {
required string src = 1;
required string username = 2;
required string groupname = 3;
optional string username = 2;
optional string groupname = 3;
}
message SetOwnerResponseProto { // void response
@ -139,7 +139,7 @@ message GetAdditionalDatanodeResponseProto {
message CompleteRequestProto {
required string src = 1;
required string clientName = 2;
required ExtendedBlockProto last = 3;
optional ExtendedBlockProto last = 3;
}
message CompleteResponseProto {
@ -204,7 +204,7 @@ message GetListingRequestProto {
required bool needLocation = 3;
}
message GetListingResponseProto {
required DirectoryListingProto dirList = 1;
optional DirectoryListingProto dirList = 1;
}
message RenewLeaseRequestProto {
@ -311,7 +311,7 @@ message DistributedUpgradeProgressResponseProto {
message ListCorruptFileBlocksRequestProto {
required string path = 1;
required string cookie = 2;
optional string cookie = 2;
}
message ListCorruptFileBlocksResponseProto {
@ -338,7 +338,7 @@ message GetFileLinkInfoRequestProto {
}
message GetFileLinkInfoResponseProto {
required HdfsFileStatusProto fs = 1;
optional HdfsFileStatusProto fs = 1;
}
message GetContentSummaryRequestProto {

View File

@ -47,6 +47,7 @@ message DatanodeCommandProto {
KeyUpdateCommand = 4;
RegisterCommand = 5;
UpgradeCommand = 6;
NullDatanodeCommand = 7;
}
required Type cmdType = 1; // Type of the command
@ -80,6 +81,7 @@ message BlockCommandProto {
enum Action {
TRANSFER = 1; // Transfer blocks to another datanode
INVALIDATE = 2; // Invalidate blocks
SHUTDOWN = 3; // Shutdown the datanode
}
required Action action = 1;
required string blockPoolId = 2;
@ -205,7 +207,7 @@ message BlockReportRequestProto {
* cmd - Command from namenode to the datanode
*/
message BlockReportResponseProto {
required DatanodeCommandProto cmd = 1;
optional DatanodeCommandProto cmd = 1;
}
/**

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
import org.apache.hadoop.hdfs.server.common.Storage;
@ -514,37 +515,6 @@ public class MiniDFSCluster {
data_dir = new File(base_dir, "data");
this.waitSafeMode = waitSafeMode;
// use alternate RPC engine if spec'd
/*
Turned off - see HDFS-2647 and HDFS-2660 for related comments.
This test can be turned on when Avro RPC is enabled using mechanism
similar to protobuf.
String rpcEngineName = System.getProperty("hdfs.rpc.engine");
if (rpcEngineName != null && !"".equals(rpcEngineName)) {
LOG.info("HDFS using RPCEngine: " + rpcEngineName);
try {
Class<?> rpcEngine = conf.getClassByName(rpcEngineName);
setRpcEngine(conf, NamenodeProtocols.class, rpcEngine);
setRpcEngine(conf, ClientNamenodeWireProtocol.class, rpcEngine);
setRpcEngine(conf, ClientDatanodeProtocolPB.class, rpcEngine);
setRpcEngine(conf, NamenodeProtocolPB.class, rpcEngine);
setRpcEngine(conf, ClientProtocol.class, rpcEngine);
setRpcEngine(conf, DatanodeProtocol.class, rpcEngine);
setRpcEngine(conf, RefreshAuthorizationPolicyProtocol.class, rpcEngine);
setRpcEngine(conf, RefreshUserMappingsProtocol.class, rpcEngine);
setRpcEngine(conf, GetUserMappingsProtocol.class, rpcEngine);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
// disable service authorization, as it does not work with tunnelled RPC
conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION,
false);
}
*/
int replication = conf.getInt(DFS_REPLICATION_KEY, 3);
conf.setInt(DFS_REPLICATION_KEY, Math.min(replication, numDataNodes));
conf.setInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -67,8 +68,8 @@ public class TestBPOfferService {
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
}
private DatanodeProtocol mockNN1;
private DatanodeProtocol mockNN2;
private DatanodeProtocolClientSideTranslatorPB mockNN1;
private DatanodeProtocolClientSideTranslatorPB mockNN2;
private NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
private int heartbeatCounts[] = new int[2];
private DataNode mockDn;
@ -100,8 +101,10 @@ public class TestBPOfferService {
/**
* Set up a mock NN with the bare minimum for a DN to register to it.
*/
private DatanodeProtocol setupNNMock(int nnIdx) throws Exception {
DatanodeProtocol mock = Mockito.mock(DatanodeProtocol.class);
private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
throws Exception {
DatanodeProtocolClientSideTranslatorPB mock =
Mockito.mock(DatanodeProtocolClientSideTranslatorPB.class);
Mockito.doReturn(
new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID,
0, HdfsConstants.LAYOUT_VERSION))
@ -298,19 +301,21 @@ public class TestBPOfferService {
* Create a BPOfferService which registers with and heartbeats with the
* specified namenode proxy objects.
*/
private BPOfferService setupBPOSForNNs(DatanodeProtocol ... nns) {
private BPOfferService setupBPOSForNNs(
DatanodeProtocolClientSideTranslatorPB ... nns) {
// Set up some fake InetAddresses, then override the connectToNN
// function to return the corresponding proxies.
final Map<InetSocketAddress, DatanodeProtocol> nnMap = Maps.newLinkedHashMap();
final Map<InetSocketAddress, DatanodeProtocolClientSideTranslatorPB> nnMap = Maps.newLinkedHashMap();
for (int port = 0; port < nns.length; port++) {
nnMap.put(new InetSocketAddress(port), nns[port]);
}
return new BPOfferService(Lists.newArrayList(nnMap.keySet()), mockDn) {
@Override
DatanodeProtocol connectToNN(InetSocketAddress nnAddr) throws IOException {
DatanodeProtocol nn = nnMap.get(nnAddr);
DatanodeProtocolClientSideTranslatorPB connectToNN(InetSocketAddress nnAddr)
throws IOException {
DatanodeProtocolClientSideTranslatorPB nn = nnMap.get(nnAddr);
if (nn == null) {
throw new AssertionError("bad NN addr: " + nnAddr);
}
@ -329,7 +334,7 @@ public class TestBPOfferService {
}, 100, 10000);
}
private void waitForBlockReport(final DatanodeProtocol mockNN)
private void waitForBlockReport(final DatanodeProtocolClientSideTranslatorPB mockNN)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
@ -374,7 +379,7 @@ public class TestBPOfferService {
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
ExtendedBlock fakeBlock,
DatanodeProtocol mockNN) throws Exception {
DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
final ArgumentCaptor<ReceivedDeletedBlockInfo[]> captor =
ArgumentCaptor.forClass(ReceivedDeletedBlockInfo[].class);
GenericTestUtils.waitFor(new Supplier<Boolean>() {

View File

@ -23,8 +23,8 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.Test;
import org.mockito.Mockito;
@ -49,7 +49,8 @@ public class TestDatanodeRegister {
NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");
DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class);
DatanodeProtocolClientSideTranslatorPB fakeDNProt =
mock(DatanodeProtocolClientSideTranslatorPB.class);
when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo);
actor.setNameNode( fakeDNProt );

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test WebHDFS with multiple NameNodes
*/
public class TestWebHdfsWithMultipleNameNodes {
static final Log LOG = WebHdfsTestUtil.LOG;
static private void setLogLevel() {
((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF);
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
}
private static final Configuration conf = new HdfsConfiguration();
private static MiniDFSCluster cluster;
private static WebHdfsFileSystem[] webhdfs;
@BeforeClass
public static void setupTest() {
setLogLevel();
try {
setupCluster(4, 3);
} catch(Exception e) {
throw new RuntimeException(e);
}
}
private static void setupCluster(final int nNameNodes, final int nDataNodes)
throws Exception {
LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes)
.build();
cluster.waitActive();
webhdfs = new WebHdfsFileSystem[nNameNodes];
for(int i = 0; i < webhdfs.length; i++) {
final InetSocketAddress addr = cluster.getNameNode(i).getHttpAddress();
final String uri = WebHdfsFileSystem.SCHEME + "://"
+ addr.getHostName() + ":" + addr.getPort() + "/";
webhdfs[i] = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
}
}
@AfterClass
public static void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private static String createString(String prefix, int i) {
//The suffix is to make sure the strings have different lengths.
final String suffix = "*********************".substring(0, i+1);
return prefix + i + suffix + "\n";
}
private static String[] createStrings(String prefix, String name) {
final String[] strings = new String[webhdfs.length];
for(int i = 0; i < webhdfs.length; i++) {
strings[i] = createString(prefix, i);
LOG.info(name + "[" + i + "] = " + strings[i]);
}
return strings;
}
@Test
public void testRedirect() throws Exception {
final String dir = "/testRedirect/";
final String filename = "file";
final Path p = new Path(dir, filename);
final String[] writeStrings = createStrings("write to webhdfs ", "write");
final String[] appendStrings = createStrings("append to webhdfs ", "append");
//test create: create a file for each namenode
for(int i = 0; i < webhdfs.length; i++) {
final FSDataOutputStream out = webhdfs[i].create(p);
out.write(writeStrings[i].getBytes());
out.close();
}
for(int i = 0; i < webhdfs.length; i++) {
//check file length
final long expected = writeStrings[i].length();
Assert.assertEquals(expected, webhdfs[i].getFileStatus(p).getLen());
}
//test read: check file content for each namenode
for(int i = 0; i < webhdfs.length; i++) {
final FSDataInputStream in = webhdfs[i].open(p);
for(int c, j = 0; (c = in.read()) != -1; j++) {
Assert.assertEquals(writeStrings[i].charAt(j), c);
}
in.close();
}
//test append: append to the file for each namenode
for(int i = 0; i < webhdfs.length; i++) {
final FSDataOutputStream out = webhdfs[i].append(p);
out.write(appendStrings[i].getBytes());
out.close();
}
for(int i = 0; i < webhdfs.length; i++) {
//check file length
final long expected = writeStrings[i].length() + appendStrings[i].length();
Assert.assertEquals(expected, webhdfs[i].getFileStatus(p).getLen());
}
//test read: check file content for each namenode
for(int i = 0; i < webhdfs.length; i++) {
final StringBuilder b = new StringBuilder();
final FSDataInputStream in = webhdfs[i].open(p);
for(int c; (c = in.read()) != -1; ) {
b.append((char)c);
}
final int wlen = writeStrings[i].length();
Assert.assertEquals(writeStrings[i], b.substring(0, wlen));
Assert.assertEquals(appendStrings[i], b.substring(wlen));
in.close();
}
}
}