diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 55001e81df4..df434f48263 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -59,6 +59,9 @@ Trunk (unreleased changes) HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm) + HDFS-3178. Add states and state handler for journal synchronization in + JournalService. (szetszwo) + OPTIMIZATIONS HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java new file mode 100644 index 00000000000..5d93a4cbaea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.journalservice; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; + +/** + * JournalListener is a callback interface to handle journal records + * received from the namenode. + */ +public interface JournalListener { + /** + * Check the namespace information returned by a namenode + * @param service service that is making the callback + * @param info returned namespace information from the namenode + * + * The application using {@link JournalService} can stop the service if + * {@code info} validation fails. + */ + public void verifyVersion(JournalService service, NamespaceInfo info); + + /** + * Process the received Journal record + * @param service {@link JournalService} making the callback + * @param firstTxnId first transaction Id in the journal + * @param numTxns number of records + * @param records journal records + * @throws IOException on error + * + * Any IOException thrown from the listener is thrown back in + * {@link JournalProtocol#journal} + */ + public void journal(JournalService service, long firstTxnId, int numTxns, + byte[] records) throws IOException; + + /** + * Roll the editlog + * @param service {@link JournalService} making the callback + * @param txid transaction ID to roll at + * + * Any IOException thrown from the listener is thrown back in + * {@link JournalProtocol#startLogSegment} + */ + public void rollLogs(JournalService service, long txid) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java similarity index 60% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java index d62ed0e6c07..71210c6140d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.namenode; +package org.apache.hadoop.hdfs.server.journalservice; import java.io.IOException; import java.net.InetSocketAddress; @@ -23,15 +23,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; -import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -60,87 +59,90 @@ */ public class JournalService implements JournalProtocol { public static final Log LOG = LogFactory.getLog(JournalService.class.getName()); + private final JournalListener listener; - private final boolean internalRpcServer; private final InetSocketAddress nnAddress; private final NamenodeRegistration registration; private final NamenodeProtocol namenode; - private volatile State state = State.INIT; - private RPC.Server rpcServer; + private final StateHandler stateHandler = new StateHandler(); + private final RPC.Server rpcServer; enum State { - INIT, - STARTING_UP, - RUNNING, - STOPPED; + /** The service is initialized and ready to start. */ + INIT(false, false), + /** + * RPC server is started. + * The service is ready to receive requests from namenode. + */ + STARTED(false, false), + /** The service is fenced by a namenode and waiting for roll. */ + WAITING_FOR_ROLL(false, true), + /** + * The existing log is syncing with another source + * and it accepts journal from Namenode. + */ + SYNCING(true, true), + /** The existing log is in sync and it accepts journal from Namenode. */ + IN_SYNC(true, true), + /** The service is stopped. */ + STOPPED(false, false); + + final boolean isJournalAllowed; + final boolean isStartLogSegmentAllowed; + + State(boolean isJournalAllowed, boolean isStartLogSegmentAllowed) { + this.isJournalAllowed = isJournalAllowed; + this.isStartLogSegmentAllowed = isStartLogSegmentAllowed; + } } - /** - * JournalListener is a callback interface to handle journal records - * received from the namenode. - */ - public interface JournalListener { - /** - * Check the namespace information returned by a namenode - * @param service service that is making the callback - * @param info returned namespace information from the namenode - * - * The application using {@link JournalService} can stop the service if - * {@code info} validation fails. - */ - public void verifyVersion(JournalService service, NamespaceInfo info); + static class StateHandler { + State current = State.INIT; - /** - * Process the received Journal record - * @param service {@link JournalService} making the callback - * @param firstTxnId first transaction Id in the journal - * @param numTxns number of records - * @param records journal records - * @throws IOException on error - * - * Any IOException thrown from the listener is thrown back in - * {@link JournalProtocol#journal} - */ - public void journal(JournalService service, long firstTxnId, int numTxns, - byte[] records) throws IOException; - - /** - * Roll the editlog - * @param service {@link JournalService} making the callback - * @param txid transaction ID to roll at - * - * Any IOException thrown from the listener is thrown back in - * {@link JournalProtocol#startLogSegment} - */ - public void rollLogs(JournalService service, long txid) throws IOException; - } - - /** - * Constructor to create {@link JournalService} based on an existing RPC server. - * After creating the service, the caller needs to start the RPC server. - * - * @param conf Configuration - * @param nnAddr host:port for the active Namenode's RPC server - * @param listener call-back interface to listen to journal activities - * @param rpcServer RPC server if the application has already one, which can be - * reused. If this is null, then the RPC server is started by - * {@link JournalService} - * @param reg namenode registration information if there is one already, say - * if you are using this service in namenode. If it is null, then the - * service creates a new registration. - * @throws IOException on error - */ - JournalService(Configuration conf, InetSocketAddress nnAddr, - JournalListener listener, RPC.Server rpcServer, NamenodeRegistration reg) - throws IOException { - this.nnAddress = nnAddr; - this.listener = listener; - this.registration = reg; - this.internalRpcServer = false; - this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr, - NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true) - .getProxy(); - initRpcServer(conf, rpcServer); + synchronized void start() { + if (current != State.INIT) { + throw new IllegalStateException("Service cannot be started in " + + current + " state."); + } + current = State.STARTED; + } + + synchronized void waitForRoll() { + if (current != State.STARTED) { + throw new IllegalStateException("Cannot wait-for-roll in " + current + + " state."); + } + current = State.WAITING_FOR_ROLL; + } + + synchronized void startLogSegment() throws IOException { + if (current == State.WAITING_FOR_ROLL) { + current = State.SYNCING; + } + } + + synchronized void isStartLogSegmentAllowed() throws IOException { + if (!current.isStartLogSegmentAllowed) { + throw new IOException("Cannot start log segment in " + current + + " state."); + } + } + + synchronized void isJournalAllowed() throws IOException { + if (!current.isJournalAllowed) { + throw new IOException("Cannot journal in " + current + " state."); + } + } + + synchronized boolean isStopped() { + if (current == State.STOPPED) { + LOG.warn("Ignore stop request since the service is in " + current + + " state."); + return true; + } + current = State.STOPPED; + return false; + } } /** @@ -160,11 +162,11 @@ public void journal(JournalService service, long firstTxnId, int numTxns, throws IOException { this.nnAddress = nnAddr; this.listener = listener; - this.internalRpcServer = true; this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr, NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true) .getProxy(); - initRpcServer(conf, serverAddress); + this.rpcServer = createRpcServer(conf, serverAddress, this); + String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress()); StorageInfo storage = new StorageInfo( LayoutVersion.getCurrentLayoutVersion(), 0, "", 0); @@ -176,23 +178,13 @@ public void journal(JournalService service, long firstTxnId, int numTxns, * Start the service. */ public void start() { - synchronized(this) { - if (state != State.INIT) { - LOG.info("Service cannot be started in state - " + state); - return; - } - state = State.STARTING_UP; - } + stateHandler.start(); + // Start the RPC server - if (internalRpcServer) { - LOG.info("Starting rpc server"); - rpcServer.start(); - } - - boolean registered = false; - boolean handshakeComplete = false; - boolean rollEdits = false; - while (state == State.STARTING_UP) { + LOG.info("Starting rpc server"); + rpcServer.start(); + + for(boolean registered = false, handshakeComplete = false; ; ) { try { // Perform handshake if (!handshakeComplete) { @@ -206,12 +198,6 @@ public void start() { registerWithNamenode(); registered = true; LOG.info("Registration completed"); - } - - if (!rollEdits) { - namenode.rollEditLog(); - rollEdits = true; - LOG.info("Editlog roll completed"); break; } } catch (IOException ioe) { @@ -226,10 +212,13 @@ public void start() { LOG.warn("Encountered exception ", ie); } } - synchronized(this) { - state = State.RUNNING; + + stateHandler.waitForRoll(); + try { + namenode.rollEditLog(); + } catch (IOException e) { + LOG.warn("Encountered exception ", e); } - } /** @@ -237,15 +226,8 @@ public void start() { * RPC Server must be stopped the application. */ public void stop() { - synchronized (this) { - if (state == State.STOPPED) { - return; - } - state = State.STOPPED; - } - if (internalRpcServer && rpcServer != null) { + if (!stateHandler.isStopped()) { rpcServer.stop(); - rpcServer = null; } } @@ -255,6 +237,7 @@ public void journal(NamenodeRegistration registration, long firstTxnId, if (LOG.isTraceEnabled()) { LOG.trace("Received journal " + firstTxnId + " " + numTxns); } + stateHandler.isJournalAllowed(); verify(registration); listener.journal(this, firstTxnId, numTxns, records); } @@ -265,37 +248,23 @@ public void startLogSegment(NamenodeRegistration registration, long txid) if (LOG.isTraceEnabled()) { LOG.trace("Received startLogSegment " + txid); } + stateHandler.isStartLogSegmentAllowed(); verify(registration); listener.rollLogs(this, txid); + stateHandler.startLogSegment(); } - /** - * Stand alone mode where RPC Server is created and managed by this service - */ - private void initRpcServer(Configuration conf, InetSocketAddress serverAddress) - throws IOException { + /** Create an RPC server. */ + private static RPC.Server createRpcServer(Configuration conf, + InetSocketAddress address, JournalProtocol impl) throws IOException { RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class); JournalProtocolServerSideTranslatorPB xlator = - new JournalProtocolServerSideTranslatorPB(this); + new JournalProtocolServerSideTranslatorPB(impl); BlockingService service = JournalProtocolService.newReflectiveBlockingService(xlator); - rpcServer = RPC.getServer(JournalProtocolPB.class, service, - serverAddress.getHostName(), serverAddress.getPort(), 1, false, conf, - null); - } - - /** - * RPC Server is created and managed by the application - used by this service - */ - private void initRpcServer(Configuration conf, RPC.Server server) - throws IOException { - rpcServer = server; - JournalProtocolServerSideTranslatorPB xlator = - new JournalProtocolServerSideTranslatorPB(this); - BlockingService service = - JournalProtocolService.newReflectiveBlockingService(xlator); - DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service, rpcServer); + return RPC.getServer(JournalProtocolPB.class, service, + address.getHostName(), address.getPort(), 1, false, conf, null); } private void verify(NamenodeRegistration reg) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java similarity index 96% rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java index 2977e30ee32..03c511f3195 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestJournalService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.namenode; +package org.apache.hadoop.hdfs.server.journalservice; import java.io.IOException; import java.net.InetSocketAddress; @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.JournalService.JournalListener; import org.junit.Test; import org.mockito.Mockito;