HDFS-3573. Supply NamespaceInfo when instantiating JournalManagers. Contributed by Todd Lipcon and Ivan Kelly.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1403594 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7634ab0bef
commit
c44e61588f
|
@ -89,6 +89,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HDFS-4121. Add namespace declarations in hdfs .proto files for languages
|
HDFS-4121. Add namespace declarations in hdfs .proto files for languages
|
||||||
other than java. (Binglin Chang via suresh)
|
other than java. (Binglin Chang via suresh)
|
||||||
|
|
||||||
|
HDFS-3573. Supply NamespaceInfo when instantiating JournalManagers.
|
||||||
|
(todd and ivank via umamahesh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
import org.apache.bookkeeper.conf.ClientConfiguration;
|
import org.apache.bookkeeper.conf.ClientConfiguration;
|
||||||
|
@ -158,11 +159,19 @@ public class BookKeeperJournalManager implements JournalManager {
|
||||||
(byte)(i) };
|
(byte)(i) };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BookKeeperJournalManager(Configuration conf, URI uri) throws IOException {
|
||||||
|
this(conf, uri, null);
|
||||||
|
// TODO(ivank): update BookKeeperJournalManager to do something
|
||||||
|
// with the NamespaceInfo. This constructor has been added
|
||||||
|
// for compatibility with the old tests, and may be removed
|
||||||
|
// when the tests are updated.
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a Bookkeeper journal manager.
|
* Construct a Bookkeeper journal manager.
|
||||||
*/
|
*/
|
||||||
public BookKeeperJournalManager(Configuration conf, URI uri)
|
public BookKeeperJournalManager(Configuration conf, URI uri,
|
||||||
throws IOException {
|
NamespaceInfo nsInfo) throws IOException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
String zkConnect = uri.getAuthority().replace(";", ",");
|
String zkConnect = uri.getAuthority().replace(";", ",");
|
||||||
String zkPath = uri.getPath();
|
String zkPath = uri.getPath();
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
@ -301,7 +302,7 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
endCurrentLogSegment(true);
|
endCurrentLogSegment(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!journalSet.isEmpty()) {
|
if (journalSet != null && !journalSet.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
journalSet.close();
|
journalSet.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -950,6 +951,9 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
minTxIdToKeep <= curSegmentTxId :
|
minTxIdToKeep <= curSegmentTxId :
|
||||||
"cannot purge logs older than txid " + minTxIdToKeep +
|
"cannot purge logs older than txid " + minTxIdToKeep +
|
||||||
" when current segment starts at " + curSegmentTxId;
|
" when current segment starts at " + curSegmentTxId;
|
||||||
|
if (minTxIdToKeep == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// This could be improved to not need synchronization. But currently,
|
// This could be improved to not need synchronization. But currently,
|
||||||
// journalSet is not threadsafe, so we need to synchronize this method.
|
// journalSet is not threadsafe, so we need to synchronize this method.
|
||||||
|
@ -1206,8 +1210,9 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Constructor<? extends JournalManager> cons
|
Constructor<? extends JournalManager> cons
|
||||||
= clazz.getConstructor(Configuration.class, URI.class);
|
= clazz.getConstructor(Configuration.class, URI.class,
|
||||||
return cons.newInstance(conf, uri);
|
NamespaceInfo.class);
|
||||||
|
return cons.newInstance(conf, uri, storage.getNamespaceInfo());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IllegalArgumentException("Unable to construct journal, "
|
throw new IllegalArgumentException("Unable to construct journal, "
|
||||||
+ uri, e);
|
+ uri, e);
|
||||||
|
|
|
@ -126,12 +126,6 @@ public class FSImage implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.editLog = new FSEditLog(conf, storage, editsDirs);
|
this.editLog = new FSEditLog(conf, storage, editsDirs);
|
||||||
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
|
|
||||||
if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
|
|
||||||
editLog.initJournalsForWrite();
|
|
||||||
} else {
|
|
||||||
editLog.initSharedJournalsForRead();
|
|
||||||
}
|
|
||||||
|
|
||||||
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
||||||
}
|
}
|
||||||
|
@ -496,6 +490,7 @@ public class FSImage implements Closeable {
|
||||||
// return back the real image
|
// return back the real image
|
||||||
realImage.getStorage().setStorageInfo(ckptImage.getStorage());
|
realImage.getStorage().setStorageInfo(ckptImage.getStorage());
|
||||||
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
|
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
|
||||||
|
realImage.initEditLog();
|
||||||
|
|
||||||
target.dir.fsImage = realImage;
|
target.dir.fsImage = realImage;
|
||||||
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
|
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
|
||||||
|
@ -568,10 +563,8 @@ public class FSImage implements Closeable {
|
||||||
|
|
||||||
Iterable<EditLogInputStream> editStreams = null;
|
Iterable<EditLogInputStream> editStreams = null;
|
||||||
|
|
||||||
if (editLog.isOpenForWrite()) {
|
initEditLog();
|
||||||
// We only want to recover streams if we're going into Active mode.
|
|
||||||
editLog.recoverUnclosedStreams();
|
|
||||||
}
|
|
||||||
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
|
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
|
||||||
getLayoutVersion())) {
|
getLayoutVersion())) {
|
||||||
// If we're open for write, we're either non-HA or we're the active NN, so
|
// If we're open for write, we're either non-HA or we're the active NN, so
|
||||||
|
@ -629,6 +622,17 @@ public class FSImage implements Closeable {
|
||||||
return needToSave;
|
return needToSave;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void initEditLog() {
|
||||||
|
Preconditions.checkState(getNamespaceID() != 0,
|
||||||
|
"Must know namespace ID before initting edit log");
|
||||||
|
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
|
if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
|
||||||
|
editLog.initJournalsForWrite();
|
||||||
|
editLog.recoverUnclosedStreams();
|
||||||
|
} else {
|
||||||
|
editLog.initSharedJournalsForRead();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param imageFile the image file that was loaded
|
* @param imageFile the image file that was loaded
|
||||||
|
|
|
@ -238,12 +238,6 @@ class FileJournalManager implements JournalManager {
|
||||||
static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
|
static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
|
||||||
Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
|
Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
|
||||||
for (EditLogFile elf : elfs) {
|
for (EditLogFile elf : elfs) {
|
||||||
if (elf.lastTxId < fromTxId) {
|
|
||||||
LOG.debug("passing over " + elf + " because it ends at " +
|
|
||||||
elf.lastTxId + ", but we only care about transactions " +
|
|
||||||
"as new as " + fromTxId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (elf.isInProgress()) {
|
if (elf.isInProgress()) {
|
||||||
if (!inProgressOk) {
|
if (!inProgressOk) {
|
||||||
LOG.debug("passing over " + elf + " because it is in progress " +
|
LOG.debug("passing over " + elf + " because it is in progress " +
|
||||||
|
@ -258,6 +252,13 @@ class FileJournalManager implements JournalManager {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (elf.lastTxId < fromTxId) {
|
||||||
|
assert elf.lastTxId != HdfsConstants.INVALID_TXID;
|
||||||
|
LOG.debug("passing over " + elf + " because it ends at " +
|
||||||
|
elf.lastTxId + ", but we only care about transactions " +
|
||||||
|
"as new as " + fromTxId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
|
EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
|
||||||
elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
|
elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
|
||||||
LOG.debug("selecting edit log stream " + elf);
|
LOG.debug("selecting edit log stream " + elf);
|
||||||
|
|
|
@ -1002,4 +1002,12 @@ public class NNStorage extends Storage implements Closeable,
|
||||||
inspectStorageDirs(inspector);
|
inspectStorageDirs(inspector);
|
||||||
return inspector;
|
return inspector;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public NamespaceInfo getNamespaceInfo() {
|
||||||
|
return new NamespaceInfo(
|
||||||
|
getNamespaceID(),
|
||||||
|
getClusterID(),
|
||||||
|
getBlockPoolID(),
|
||||||
|
getCTime());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,6 @@ import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
@ -863,11 +862,7 @@ public class NameNode {
|
||||||
Lists.<URI>newArrayList(),
|
Lists.<URI>newArrayList(),
|
||||||
sharedEditsDirs);
|
sharedEditsDirs);
|
||||||
|
|
||||||
newSharedStorage.format(new NamespaceInfo(
|
newSharedStorage.format(existingStorage.getNamespaceInfo());
|
||||||
existingStorage.getNamespaceID(),
|
|
||||||
existingStorage.getClusterID(),
|
|
||||||
existingStorage.getBlockPoolID(),
|
|
||||||
existingStorage.getCTime()));
|
|
||||||
|
|
||||||
// Need to make sure the edit log segments are in good shape to initialize
|
// Need to make sure the edit log segments are in good shape to initialize
|
||||||
// the shared edits dir.
|
// the shared edits dir.
|
||||||
|
|
|
@ -189,6 +189,8 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||||
// Load the newly formatted image, using all of the directories (including shared
|
// Load the newly formatted image, using all of the directories (including shared
|
||||||
// edits)
|
// edits)
|
||||||
FSImage image = new FSImage(conf);
|
FSImage image = new FSImage(conf);
|
||||||
|
image.getStorage().setStorageInfo(storage);
|
||||||
|
image.initEditLog();
|
||||||
assert image.getEditLog().isOpenForRead() :
|
assert image.getEditLog().isOpenForRead() :
|
||||||
"Expected edit log to be open for read";
|
"Expected edit log to be open for read";
|
||||||
|
|
||||||
|
|
|
@ -18,18 +18,23 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestGenericJournalConf {
|
public class TestGenericJournalConf {
|
||||||
|
private static final String DUMMY_URI = "dummy://test";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that an exception is thrown if a journal class doesn't exist
|
* Test that an exception is thrown if a journal class doesn't exist
|
||||||
* in the configuration
|
* in the configuration
|
||||||
|
@ -114,12 +119,17 @@ public class TestGenericJournalConf {
|
||||||
|
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".dummy",
|
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".dummy",
|
||||||
DummyJournalManager.class.getName());
|
DummyJournalManager.class.getName());
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
|
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, DUMMY_URI);
|
||||||
"dummy://test");
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
|
||||||
try {
|
try {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
assertNotNull(DummyJournalManager.conf);
|
||||||
|
assertEquals(new URI(DUMMY_URI), DummyJournalManager.uri);
|
||||||
|
assertNotNull(DummyJournalManager.nsInfo);
|
||||||
|
assertEquals(DummyJournalManager.nsInfo.getClusterID(),
|
||||||
|
cluster.getNameNode().getNamesystem().getClusterId());
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
@ -128,7 +138,17 @@ public class TestGenericJournalConf {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DummyJournalManager implements JournalManager {
|
public static class DummyJournalManager implements JournalManager {
|
||||||
public DummyJournalManager(Configuration conf, URI u) {}
|
static Configuration conf = null;
|
||||||
|
static URI uri = null;
|
||||||
|
static NamespaceInfo nsInfo = null;
|
||||||
|
|
||||||
|
public DummyJournalManager(Configuration conf, URI u,
|
||||||
|
NamespaceInfo nsInfo) {
|
||||||
|
// Set static vars so the test case can verify them.
|
||||||
|
DummyJournalManager.conf = conf;
|
||||||
|
DummyJournalManager.uri = u;
|
||||||
|
DummyJournalManager.nsInfo = nsInfo;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
||||||
|
@ -162,7 +182,7 @@ public class TestGenericJournalConf {
|
||||||
|
|
||||||
public static class BadConstructorJournalManager extends DummyJournalManager {
|
public static class BadConstructorJournalManager extends DummyJournalManager {
|
||||||
public BadConstructorJournalManager() {
|
public BadConstructorJournalManager() {
|
||||||
super(null, null);
|
super(null, null, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue