YARN-1987. Wrapper for leveldb DBIterator to aid in handling database exceptions. (Jason Lowe via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1593757 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-05-11 06:11:04 +00:00
parent ae9109b911
commit 4a19196a34
4 changed files with 320 additions and 0 deletions

View File

@ -73,6 +73,9 @@ Release 2.5.0 - UNRELEASED
YARN-1982. Renamed the daemon name to be TimelineServer instead of History YARN-1982. Renamed the daemon name to be TimelineServer instead of History
Server and deprecated the old usage. (Zhijie Shen via vinodkv) Server and deprecated the old usage. (Zhijie Shen via vinodkv)
YARN-1987. Wrapper for leveldb DBIterator to aid in handling database exceptions.
(Jason Lowe via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -85,6 +85,10 @@
<groupId>org.apache.zookeeper</groupId> <groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId> <artifactId>zookeeper</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.utils;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.ReadOptions;
/**
* A wrapper for a DBIterator to translate the raw RuntimeExceptions that
* can be thrown into DBExceptions.
*/
@Public
@Evolving
public class LeveldbIterator implements Iterator<Map.Entry<byte[], byte[]>>,
Closeable {
private DBIterator iter;
/**
* Create an iterator for the specified database
*/
public LeveldbIterator(DB db) {
iter = db.iterator();
}
/**
* Create an iterator for the specified database
*/
public LeveldbIterator(DB db, ReadOptions options) {
iter = db.iterator(options);
}
/**
* Create an iterator using the specified underlying DBIterator
*/
public LeveldbIterator(DBIterator iter) {
this.iter = iter;
}
/**
* Repositions the iterator so the key of the next BlockElement
* returned greater than or equal to the specified targetKey.
*/
public void seek(byte[] key) throws DBException {
try {
iter.seek(key);
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* Repositions the iterator so is is at the beginning of the Database.
*/
public void seekToFirst() throws DBException {
try {
iter.seekToFirst();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* Repositions the iterator so it is at the end of of the Database.
*/
public void seekToLast() throws DBException {
try {
iter.seekToLast();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* Returns <tt>true</tt> if the iteration has more elements.
*/
public boolean hasNext() throws DBException {
try {
return iter.hasNext();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* Returns the next element in the iteration.
*/
@Override
public Map.Entry<byte[], byte[]> next() throws DBException {
try {
return iter.next();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* Returns the next element in the iteration, without advancing the
* iteration.
*/
public Map.Entry<byte[], byte[]> peekNext() throws DBException {
try {
return iter.peekNext();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* @return true if there is a previous entry in the iteration.
*/
public boolean hasPrev() throws DBException {
try {
return iter.hasPrev();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* @return the previous element in the iteration and rewinds the iteration.
*/
public Map.Entry<byte[], byte[]> prev() throws DBException {
try {
return iter.prev();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* @return the previous element in the iteration, without rewinding the
* iteration.
*/
public Map.Entry<byte[], byte[]> peekPrev() throws DBException {
try {
return iter.peekPrev();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* Removes from the database the last element returned by the iterator.
*/
@Override
public void remove() throws DBException {
try {
iter.remove();
} catch (DBException e) {
throw e;
} catch (RuntimeException e) {
throw new DBException(e.getMessage(), e);
}
}
/**
* Closes the iterator.
*/
@Override
public void close() throws IOException {
try {
iter.close();
} catch (RuntimeException e) {
throw new IOException(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.utils;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.junit.Test;
public class TestLeveldbIterator {
private static class CallInfo {
String methodName;
Object[] args;
Class<?>[] argTypes;
public CallInfo(String methodName, Object... args) {
this.methodName = methodName;
this.args = args;
argTypes = new Class[args.length];
for (int i = 0; i < args.length; ++i) {
argTypes[i] = args[i].getClass();
}
}
}
// array of methods that should throw DBException instead of raw
// runtime exceptions
private static CallInfo[] RTEXC_METHODS = new CallInfo[] {
new CallInfo("seek", new byte[0]),
new CallInfo("seekToFirst"),
new CallInfo("seekToLast"),
new CallInfo("hasNext"),
new CallInfo("next"),
new CallInfo("peekNext"),
new CallInfo("hasPrev"),
new CallInfo("prev"),
new CallInfo("peekPrev"),
new CallInfo("remove")
};
@Test
public void testExceptionHandling() throws Exception {
InvocationHandler rtExcHandler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
throw new RuntimeException("forced runtime error");
}
};
DBIterator dbiter = (DBIterator) Proxy.newProxyInstance(
DBIterator.class.getClassLoader(), new Class[] { DBIterator.class },
rtExcHandler);
LeveldbIterator iter = new LeveldbIterator(dbiter);
for (CallInfo ci : RTEXC_METHODS) {
Method method = iter.getClass().getMethod(ci.methodName, ci.argTypes);
assertNotNull("unable to locate method " + ci.methodName, method);
try {
method.invoke(iter, ci.args);
fail("operation should have thrown");
} catch (InvocationTargetException ite) {
Throwable exc = ite.getTargetException();
assertTrue("Method " + ci.methodName + " threw non-DBException: "
+ exc, exc instanceof DBException);
assertFalse("Method " + ci.methodName + " double-wrapped DBException",
exc.getCause() instanceof DBException);
}
}
// check close() throws IOException
try {
iter.close();
fail("operation shoul have thrown");
} catch (IOException e) {
// expected
}
}
}