diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b4ca510dfa8..a80b743d9ed 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -58,6 +58,9 @@ Release 2.5.0 - UNRELEASED YARN-1982. Renamed the daemon name to be TimelineServer instead of History Server and deprecated the old usage. (Zhijie Shen via vinodkv) + YARN-1987. Wrapper for leveldb DBIterator to aid in handling database exceptions. + (Jason Lowe via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 1e8be2fae06..fdb6334521d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -103,6 +103,10 @@ org.apache.zookeeper zookeeper + + org.fusesource.leveldbjni + leveldbjni-all + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/LeveldbIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/LeveldbIterator.java new file mode 100644 index 00000000000..f33cb5f1d89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/LeveldbIterator.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.utils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.ReadOptions; + +/** + * A wrapper for a DBIterator to translate the raw RuntimeExceptions that + * can be thrown into DBExceptions. + */ +@Public +@Evolving +public class LeveldbIterator implements Iterator>, + Closeable { + private DBIterator iter; + + /** + * Create an iterator for the specified database + */ + public LeveldbIterator(DB db) { + iter = db.iterator(); + } + + /** + * Create an iterator for the specified database + */ + public LeveldbIterator(DB db, ReadOptions options) { + iter = db.iterator(options); + } + + /** + * Create an iterator using the specified underlying DBIterator + */ + public LeveldbIterator(DBIterator iter) { + this.iter = iter; + } + + /** + * Repositions the iterator so the key of the next BlockElement + * returned greater than or equal to the specified targetKey. + */ + public void seek(byte[] key) throws DBException { + try { + iter.seek(key); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * Repositions the iterator so is is at the beginning of the Database. + */ + public void seekToFirst() throws DBException { + try { + iter.seekToFirst(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * Repositions the iterator so it is at the end of of the Database. + */ + public void seekToLast() throws DBException { + try { + iter.seekToLast(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * Returns true if the iteration has more elements. + */ + public boolean hasNext() throws DBException { + try { + return iter.hasNext(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * Returns the next element in the iteration. + */ + @Override + public Map.Entry next() throws DBException { + try { + return iter.next(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * Returns the next element in the iteration, without advancing the + * iteration. + */ + public Map.Entry peekNext() throws DBException { + try { + return iter.peekNext(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * @return true if there is a previous entry in the iteration. + */ + public boolean hasPrev() throws DBException { + try { + return iter.hasPrev(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * @return the previous element in the iteration and rewinds the iteration. + */ + public Map.Entry prev() throws DBException { + try { + return iter.prev(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * @return the previous element in the iteration, without rewinding the + * iteration. + */ + public Map.Entry peekPrev() throws DBException { + try { + return iter.peekPrev(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * Removes from the database the last element returned by the iterator. + */ + @Override + public void remove() throws DBException { + try { + iter.remove(); + } catch (DBException e) { + throw e; + } catch (RuntimeException e) { + throw new DBException(e.getMessage(), e); + } + } + + /** + * Closes the iterator. + */ + @Override + public void close() throws IOException { + try { + iter.close(); + } catch (RuntimeException e) { + throw new IOException(e.getMessage(), e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/utils/TestLeveldbIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/utils/TestLeveldbIterator.java new file mode 100644 index 00000000000..12e646deb4d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/utils/TestLeveldbIterator.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.utils; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.DBIterator; +import org.junit.Test; + +public class TestLeveldbIterator { + private static class CallInfo { + String methodName; + Object[] args; + Class[] argTypes; + + public CallInfo(String methodName, Object... args) { + this.methodName = methodName; + this.args = args; + argTypes = new Class[args.length]; + for (int i = 0; i < args.length; ++i) { + argTypes[i] = args[i].getClass(); + } + } + } + + // array of methods that should throw DBException instead of raw + // runtime exceptions + private static CallInfo[] RTEXC_METHODS = new CallInfo[] { + new CallInfo("seek", new byte[0]), + new CallInfo("seekToFirst"), + new CallInfo("seekToLast"), + new CallInfo("hasNext"), + new CallInfo("next"), + new CallInfo("peekNext"), + new CallInfo("hasPrev"), + new CallInfo("prev"), + new CallInfo("peekPrev"), + new CallInfo("remove") + }; + + @Test + public void testExceptionHandling() throws Exception { + InvocationHandler rtExcHandler = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + throw new RuntimeException("forced runtime error"); + } + }; + DBIterator dbiter = (DBIterator) Proxy.newProxyInstance( + DBIterator.class.getClassLoader(), new Class[] { DBIterator.class }, + rtExcHandler); + LeveldbIterator iter = new LeveldbIterator(dbiter); + for (CallInfo ci : RTEXC_METHODS) { + Method method = iter.getClass().getMethod(ci.methodName, ci.argTypes); + assertNotNull("unable to locate method " + ci.methodName, method); + try { + method.invoke(iter, ci.args); + fail("operation should have thrown"); + } catch (InvocationTargetException ite) { + Throwable exc = ite.getTargetException(); + assertTrue("Method " + ci.methodName + " threw non-DBException: " + + exc, exc instanceof DBException); + assertFalse("Method " + ci.methodName + " double-wrapped DBException", + exc.getCause() instanceof DBException); + } + } + + // check close() throws IOException + try { + iter.close(); + fail("operation shoul have thrown"); + } catch (IOException e) { + // expected + } + } +}