HDFS-3178. Add states and state handler for journal synchronization in JournalService.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1310115 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-04-05 22:35:53 +00:00
parent c76b264196
commit 80bbefafaf
4 changed files with 169 additions and 134 deletions

View File

@ -59,6 +59,9 @@ Trunk (unreleased changes)
HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm) HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm)
HDFS-3178. Add states and state handler for journal synchronization in
JournalService. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.journalservice;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
/**
* JournalListener is a callback interface to handle journal records
* received from the namenode.
*/
public interface JournalListener {
/**
* Check the namespace information returned by a namenode
* @param service service that is making the callback
* @param info returned namespace information from the namenode
*
* The application using {@link JournalService} can stop the service if
* {@code info} validation fails.
*/
public void verifyVersion(JournalService service, NamespaceInfo info);
/**
* Process the received Journal record
* @param service {@link JournalService} making the callback
* @param firstTxnId first transaction Id in the journal
* @param numTxns number of records
* @param records journal records
* @throws IOException on error
*
* Any IOException thrown from the listener is thrown back in
* {@link JournalProtocol#journal}
*/
public void journal(JournalService service, long firstTxnId, int numTxns,
byte[] records) throws IOException;
/**
* Roll the editlog
* @param service {@link JournalService} making the callback
* @param txid transaction ID to roll at
*
* Any IOException thrown from the listener is thrown back in
* {@link JournalProtocol#startLogSegment}
*/
public void rollLogs(JournalService service, long txid) throws IOException;
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.journalservice;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -23,15 +23,14 @@ import java.net.InetSocketAddress;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -60,87 +59,90 @@ import com.google.protobuf.BlockingService;
*/ */
public class JournalService implements JournalProtocol { public class JournalService implements JournalProtocol {
public static final Log LOG = LogFactory.getLog(JournalService.class.getName()); public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
private final JournalListener listener; private final JournalListener listener;
private final boolean internalRpcServer;
private final InetSocketAddress nnAddress; private final InetSocketAddress nnAddress;
private final NamenodeRegistration registration; private final NamenodeRegistration registration;
private final NamenodeProtocol namenode; private final NamenodeProtocol namenode;
private volatile State state = State.INIT; private final StateHandler stateHandler = new StateHandler();
private RPC.Server rpcServer; private final RPC.Server rpcServer;
enum State { enum State {
INIT, /** The service is initialized and ready to start. */
STARTING_UP, INIT(false, false),
RUNNING, /**
STOPPED; * RPC server is started.
* The service is ready to receive requests from namenode.
*/
STARTED(false, false),
/** The service is fenced by a namenode and waiting for roll. */
WAITING_FOR_ROLL(false, true),
/**
* The existing log is syncing with another source
* and it accepts journal from Namenode.
*/
SYNCING(true, true),
/** The existing log is in sync and it accepts journal from Namenode. */
IN_SYNC(true, true),
/** The service is stopped. */
STOPPED(false, false);
final boolean isJournalAllowed;
final boolean isStartLogSegmentAllowed;
State(boolean isJournalAllowed, boolean isStartLogSegmentAllowed) {
this.isJournalAllowed = isJournalAllowed;
this.isStartLogSegmentAllowed = isStartLogSegmentAllowed;
}
} }
/** static class StateHandler {
* JournalListener is a callback interface to handle journal records State current = State.INIT;
* received from the namenode.
*/
public interface JournalListener {
/**
* Check the namespace information returned by a namenode
* @param service service that is making the callback
* @param info returned namespace information from the namenode
*
* The application using {@link JournalService} can stop the service if
* {@code info} validation fails.
*/
public void verifyVersion(JournalService service, NamespaceInfo info);
/** synchronized void start() {
* Process the received Journal record if (current != State.INIT) {
* @param service {@link JournalService} making the callback throw new IllegalStateException("Service cannot be started in "
* @param firstTxnId first transaction Id in the journal + current + " state.");
* @param numTxns number of records }
* @param records journal records current = State.STARTED;
* @throws IOException on error
*
* Any IOException thrown from the listener is thrown back in
* {@link JournalProtocol#journal}
*/
public void journal(JournalService service, long firstTxnId, int numTxns,
byte[] records) throws IOException;
/**
* Roll the editlog
* @param service {@link JournalService} making the callback
* @param txid transaction ID to roll at
*
* Any IOException thrown from the listener is thrown back in
* {@link JournalProtocol#startLogSegment}
*/
public void rollLogs(JournalService service, long txid) throws IOException;
} }
/** synchronized void waitForRoll() {
* Constructor to create {@link JournalService} based on an existing RPC server. if (current != State.STARTED) {
* After creating the service, the caller needs to start the RPC server. throw new IllegalStateException("Cannot wait-for-roll in " + current
* + " state.");
* @param conf Configuration }
* @param nnAddr host:port for the active Namenode's RPC server current = State.WAITING_FOR_ROLL;
* @param listener call-back interface to listen to journal activities }
* @param rpcServer RPC server if the application has already one, which can be
* reused. If this is null, then the RPC server is started by synchronized void startLogSegment() throws IOException {
* {@link JournalService} if (current == State.WAITING_FOR_ROLL) {
* @param reg namenode registration information if there is one already, say current = State.SYNCING;
* if you are using this service in namenode. If it is null, then the }
* service creates a new registration. }
* @throws IOException on error
*/ synchronized void isStartLogSegmentAllowed() throws IOException {
JournalService(Configuration conf, InetSocketAddress nnAddr, if (!current.isStartLogSegmentAllowed) {
JournalListener listener, RPC.Server rpcServer, NamenodeRegistration reg) throw new IOException("Cannot start log segment in " + current
throws IOException { + " state.");
this.nnAddress = nnAddr; }
this.listener = listener; }
this.registration = reg;
this.internalRpcServer = false; synchronized void isJournalAllowed() throws IOException {
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr, if (!current.isJournalAllowed) {
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true) throw new IOException("Cannot journal in " + current + " state.");
.getProxy(); }
initRpcServer(conf, rpcServer); }
synchronized boolean isStopped() {
if (current == State.STOPPED) {
LOG.warn("Ignore stop request since the service is in " + current
+ " state.");
return true;
}
current = State.STOPPED;
return false;
}
} }
/** /**
@ -160,11 +162,11 @@ public class JournalService implements JournalProtocol {
throws IOException { throws IOException {
this.nnAddress = nnAddr; this.nnAddress = nnAddr;
this.listener = listener; this.listener = listener;
this.internalRpcServer = true;
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr, this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true) NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
.getProxy(); .getProxy();
initRpcServer(conf, serverAddress); this.rpcServer = createRpcServer(conf, serverAddress, this);
String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress()); String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
StorageInfo storage = new StorageInfo( StorageInfo storage = new StorageInfo(
LayoutVersion.getCurrentLayoutVersion(), 0, "", 0); LayoutVersion.getCurrentLayoutVersion(), 0, "", 0);
@ -176,23 +178,13 @@ public class JournalService implements JournalProtocol {
* Start the service. * Start the service.
*/ */
public void start() { public void start() {
synchronized(this) { stateHandler.start();
if (state != State.INIT) {
LOG.info("Service cannot be started in state - " + state);
return;
}
state = State.STARTING_UP;
}
// Start the RPC server // Start the RPC server
if (internalRpcServer) {
LOG.info("Starting rpc server"); LOG.info("Starting rpc server");
rpcServer.start(); rpcServer.start();
}
boolean registered = false; for(boolean registered = false, handshakeComplete = false; ; ) {
boolean handshakeComplete = false;
boolean rollEdits = false;
while (state == State.STARTING_UP) {
try { try {
// Perform handshake // Perform handshake
if (!handshakeComplete) { if (!handshakeComplete) {
@ -206,12 +198,6 @@ public class JournalService implements JournalProtocol {
registerWithNamenode(); registerWithNamenode();
registered = true; registered = true;
LOG.info("Registration completed"); LOG.info("Registration completed");
}
if (!rollEdits) {
namenode.rollEditLog();
rollEdits = true;
LOG.info("Editlog roll completed");
break; break;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
@ -226,10 +212,13 @@ public class JournalService implements JournalProtocol {
LOG.warn("Encountered exception ", ie); LOG.warn("Encountered exception ", ie);
} }
} }
synchronized(this) {
state = State.RUNNING;
}
stateHandler.waitForRoll();
try {
namenode.rollEditLog();
} catch (IOException e) {
LOG.warn("Encountered exception ", e);
}
} }
/** /**
@ -237,15 +226,8 @@ public class JournalService implements JournalProtocol {
* RPC Server must be stopped the application. * RPC Server must be stopped the application.
*/ */
public void stop() { public void stop() {
synchronized (this) { if (!stateHandler.isStopped()) {
if (state == State.STOPPED) {
return;
}
state = State.STOPPED;
}
if (internalRpcServer && rpcServer != null) {
rpcServer.stop(); rpcServer.stop();
rpcServer = null;
} }
} }
@ -255,6 +237,7 @@ public class JournalService implements JournalProtocol {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Received journal " + firstTxnId + " " + numTxns); LOG.trace("Received journal " + firstTxnId + " " + numTxns);
} }
stateHandler.isJournalAllowed();
verify(registration); verify(registration);
listener.journal(this, firstTxnId, numTxns, records); listener.journal(this, firstTxnId, numTxns, records);
} }
@ -265,37 +248,23 @@ public class JournalService implements JournalProtocol {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Received startLogSegment " + txid); LOG.trace("Received startLogSegment " + txid);
} }
stateHandler.isStartLogSegmentAllowed();
verify(registration); verify(registration);
listener.rollLogs(this, txid); listener.rollLogs(this, txid);
stateHandler.startLogSegment();
} }
/** /** Create an RPC server. */
* Stand alone mode where RPC Server is created and managed by this service private static RPC.Server createRpcServer(Configuration conf,
*/ InetSocketAddress address, JournalProtocol impl) throws IOException {
private void initRpcServer(Configuration conf, InetSocketAddress serverAddress)
throws IOException {
RPC.setProtocolEngine(conf, JournalProtocolPB.class, RPC.setProtocolEngine(conf, JournalProtocolPB.class,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);
JournalProtocolServerSideTranslatorPB xlator = JournalProtocolServerSideTranslatorPB xlator =
new JournalProtocolServerSideTranslatorPB(this); new JournalProtocolServerSideTranslatorPB(impl);
BlockingService service = BlockingService service =
JournalProtocolService.newReflectiveBlockingService(xlator); JournalProtocolService.newReflectiveBlockingService(xlator);
rpcServer = RPC.getServer(JournalProtocolPB.class, service, return RPC.getServer(JournalProtocolPB.class, service,
serverAddress.getHostName(), serverAddress.getPort(), 1, false, conf, address.getHostName(), address.getPort(), 1, false, conf, null);
null);
}
/**
* RPC Server is created and managed by the application - used by this service
*/
private void initRpcServer(Configuration conf, RPC.Server server)
throws IOException {
rpcServer = server;
JournalProtocolServerSideTranslatorPB xlator =
new JournalProtocolServerSideTranslatorPB(this);
BlockingService service =
JournalProtocolService.newReflectiveBlockingService(xlator);
DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service, rpcServer);
} }
private void verify(NamenodeRegistration reg) throws IOException { private void verify(NamenodeRegistration reg) throws IOException {

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.journalservice;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.JournalService.JournalListener;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;