JN | Status |
" + JspUtil.escapeXml(l.toString()) + " | "); + sb.append(""); + l.appendHtmlReport(sb); + sb.append(" |
minTxIdToKeep
parameter
+ * are removed.
+ */
+ private static void purgeMatching(File dir, ListJournal Manager | State |
" + jas.getManager()); + if (jas.isRequired()) { + out.print(" [required]"); + } + out.print(" | "); + + if (jas.isDisabled()) { + out.print("Failed"); + } else if (openForWrite) { + EditLogOutputStream elos = jas.getCurrentStream(); + if (elos != null) { + out.println(elos.generateHtmlReport()); + } else { + out.println("not currently writing"); + } + } else { + out.println("open for read"); + } + out.println(" |
long
+ * value, but does not make any effort to make it truly durable. This is in
+ * contrast to {@link PersistentLongFile} which fsync()s the value on every
+ * change.
+ *
+ * This should be used for values which are updated frequently (such that
+ * performance is important) and not required to be up-to-date for correctness.
+ *
+ * This class also differs in that it stores the value as binary data instead
+ * of a textual string.
+ */
+@InterfaceAudience.Private
+public class BestEffortLongFile implements Closeable {
+
+ private final File file;
+ private final long defaultVal;
+
+ private long value;
+
+ private FileChannel ch = null;
+
+ private ByteBuffer buf = ByteBuffer.allocate(Long.SIZE/8);
+
+ public BestEffortLongFile(File file, long defaultVal) {
+ this.file = file;
+ this.defaultVal = defaultVal;
+ }
+
+ public long get() throws IOException {
+ lazyOpen();
+ return value;
+ }
+
+ public void set(long newVal) throws IOException {
+ lazyOpen();
+ buf.clear();
+ buf.putLong(newVal);
+ buf.flip();
+ IOUtils.writeFully(ch, buf, 0);
+ value = newVal;
+ }
+
+ private void lazyOpen() throws IOException {
+ if (ch != null) {
+ return;
+ }
+
+ // Load current value.
+ byte[] data = null;
+ try {
+ data = Files.toByteArray(file);
+ } catch (FileNotFoundException fnfe) {
+ // Expected - this will use default value.
+ }
+
+ if (data != null && data.length != 0) {
+ if (data.length != Longs.BYTES) {
+ throw new IOException("File " + file + " had invalid length: " +
+ data.length);
+ }
+ value = Longs.fromByteArray(data);
+ } else {
+ value = defaultVal;
+ }
+
+ // Now open file for future writes.
+ RandomAccessFile raf = new RandomAccessFile(file, "rw");
+ try {
+ ch = raf.getChannel();
+ } finally {
+ if (ch == null) {
+ IOUtils.closeStream(raf);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (ch != null) {
+ ch.close();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
index 5e776226fa6..292d0dfe63e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
@@ -57,7 +57,9 @@ public class PersistentLongFile {
}
public void set(long newVal) throws IOException {
- writeFile(file, newVal);
+ if (value != newVal || !loaded) {
+ writeFile(file, newVal);
+ }
value = newVal;
loaded = true;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
new file mode 100644
index 00000000000..d188f2beca3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.qjournal.protocol";
+option java_outer_classname = "QJournalProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "hdfs.proto";
+
+message JournalIdProto {
+ required string identifier = 1;
+}
+
+message RequestInfoProto {
+ required JournalIdProto journalId = 1;
+ required uint64 epoch = 2;
+ required uint64 ipcSerialNumber = 3;
+
+ // Whenever a writer makes a request, it informs
+ // the node of the latest committed txid. This may
+ // be higher than the transaction data included in the
+ // request itself, eg in the case that the node has
+ // fallen behind.
+ optional uint64 committedTxId = 4;
+}
+
+message SegmentStateProto {
+ required uint64 startTxId = 1;
+ required uint64 endTxId = 2;
+ required bool isInProgress = 3;
+}
+
+/**
+ * The storage format used on local disk for previously
+ * accepted decisions.
+ */
+message PersistedRecoveryPaxosData {
+ required SegmentStateProto segmentState = 1;
+ required uint64 acceptedInEpoch = 2;
+}
+
+/**
+ * journal()
+ */
+
+message JournalRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 firstTxnId = 2;
+ required uint32 numTxns = 3;
+ required bytes records = 4;
+ required uint64 segmentTxnId = 5;
+}
+
+message JournalResponseProto {
+}
+
+/**
+ * heartbeat()
+ */
+
+message HeartbeatRequestProto {
+ required RequestInfoProto reqInfo = 1;
+}
+
+message HeartbeatResponseProto { // void response
+}
+
+/**
+ * startLogSegment()
+ */
+message StartLogSegmentRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 txid = 2; // Transaction ID
+}
+
+message StartLogSegmentResponseProto {
+}
+
+/**
+ * finalizeLogSegment()
+ */
+message FinalizeLogSegmentRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 startTxId = 2;
+ required uint64 endTxId = 3;
+}
+
+message FinalizeLogSegmentResponseProto {
+}
+
+/**
+ * purgeLogs()
+ */
+message PurgeLogsRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 minTxIdToKeep = 2;
+}
+
+message PurgeLogsResponseProto {
+}
+
+/**
+ * isFormatted()
+ */
+message IsFormattedRequestProto {
+ required JournalIdProto jid = 1;
+}
+
+message IsFormattedResponseProto {
+ required bool isFormatted = 1;
+}
+
+/**
+ * getJournalState()
+ */
+message GetJournalStateRequestProto {
+ required JournalIdProto jid = 1;
+}
+
+message GetJournalStateResponseProto {
+ required uint64 lastPromisedEpoch = 1;
+ required uint32 httpPort = 2;
+}
+
+/**
+ * format()
+ */
+message FormatRequestProto {
+ required JournalIdProto jid = 1;
+ required NamespaceInfoProto nsInfo = 2;
+}
+
+message FormatResponseProto {
+}
+
+/**
+ * newEpoch()
+ */
+message NewEpochRequestProto {
+ required JournalIdProto jid = 1;
+ required NamespaceInfoProto nsInfo = 2;
+ required uint64 epoch = 3;
+}
+
+message NewEpochResponseProto {
+ optional uint64 lastSegmentTxId = 1;
+}
+
+/**
+ * getEditLogManifest()
+ */
+message GetEditLogManifestRequestProto {
+ required JournalIdProto jid = 1;
+ required uint64 sinceTxId = 2; // Transaction ID
+}
+
+message GetEditLogManifestResponseProto {
+ required RemoteEditLogManifestProto manifest = 1;
+ required uint32 httpPort = 2;
+
+ // TODO: we should add nsinfo somewhere
+ // to verify that it matches up with our expectation
+ // required NamespaceInfoProto nsInfo = 2;
+}
+
+/**
+ * prepareRecovery()
+ */
+message PrepareRecoveryRequestProto {
+ required RequestInfoProto reqInfo = 1;
+ required uint64 segmentTxId = 2;
+}
+
+message PrepareRecoveryResponseProto {
+ optional SegmentStateProto segmentState = 1;
+ optional uint64 acceptedInEpoch = 2;
+ required uint64 lastWriterEpoch = 3;
+
+ // The highest committed txid that this logger has ever seen.
+ // This may be higher than the data it actually has, in the case
+ // that it was lagging before the old writer crashed.
+ optional uint64 lastCommittedTxId = 4;
+}
+
+/**
+ * acceptRecovery()
+ */
+message AcceptRecoveryRequestProto {
+ required RequestInfoProto reqInfo = 1;
+
+ /** Details on the segment to recover */
+ required SegmentStateProto stateToAccept = 2;
+
+ /** The URL from which the log may be copied */
+ required string fromURL = 3;
+}
+
+message AcceptRecoveryResponseProto {
+}
+
+
+/**
+ * Protocol used to journal edits to a JournalNode.
+ * See the request and response for details of rpc call.
+ */
+service QJournalProtocolService {
+ rpc isFormatted(IsFormattedRequestProto) returns (IsFormattedResponseProto);
+
+ rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
+
+ rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
+
+ rpc format(FormatRequestProto) returns (FormatResponseProto);
+
+ rpc journal(JournalRequestProto) returns (JournalResponseProto);
+
+ rpc heartbeat(HeartbeatRequestProto) returns (HeartbeatResponseProto);
+
+ rpc startLogSegment(StartLogSegmentRequestProto)
+ returns (StartLogSegmentResponseProto);
+
+ rpc finalizeLogSegment(FinalizeLogSegmentRequestProto)
+ returns (FinalizeLogSegmentResponseProto);
+
+ rpc purgeLogs(PurgeLogsRequestProto)
+ returns (PurgeLogsResponseProto);
+
+ rpc getEditLogManifest(GetEditLogManifestRequestProto)
+ returns (GetEditLogManifestResponseProto);
+
+ rpc prepareRecovery(PrepareRecoveryRequestProto)
+ returns (PrepareRecoveryResponseProto);
+
+ rpc acceptRecovery(AcceptRecoveryRequestProto)
+ returns (AcceptRecoveryResponseProto);
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 70a04752ffc..924fc019243 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -302,6 +302,7 @@ message BlocksWithLocationsProto {
message RemoteEditLogProto {
required uint64 startTxId = 1; // Starting available edit log transaction
required uint64 endTxId = 2; // Ending available edit log transaction
+ optional bool isInProgress = 3 [default = false];
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index ae9f5008131..153e21ba15f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -259,6 +259,11 @@
+