From bb30abf774938e5f639b3efcf6d71fe0810342bb Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 3 Jun 2013 14:55:57 +0000 Subject: [PATCH] Try to avoid the 'short record at position' LevelDB error reported at: http://activemq.2283324.n4.nabble.com/Activemq-5-9-leveldb-replication-issue-tp4667495p4667674.html git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1489019 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/amqp/IDERunner.java | 36 +++++++++++++++++++ .../apache/activemq/leveldb/RecordLog.scala | 13 +++++-- 2 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java new file mode 100644 index 0000000000..4bd2c3ed9c --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.leveldb.LevelDBStore; + +import java.io.File; + +public class IDERunner { + + public static void main(String[]args) throws Exception { + BrokerService bs = new BrokerService(); + bs.addConnector("tcp://localhost:61616"); + LevelDBStore store = new LevelDBStore(); + store.setDirectory(new File("target/activemq-data/haleveldb")); + bs.setPersistenceAdapter(store); + bs.deleteAllMessages(); + bs.start(); + bs.waitUntilStopped(); + } +} diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala index 70adc724e8..0154232fee 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala @@ -283,8 +283,17 @@ case class RecordLog(directory: File, logSuffix:String) { data } else { val data = new Buffer(length) - if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != data.length ) { - throw new IOException("short record at position: "+record_position+" in file: "+file+", offset: "+offset) + var bb = data.toByteBuffer + var position = offset+LOG_HEADER_SIZE + while( bb.hasRemaining ) { + var count = channel.read(bb, position) + if( count == 0 ) { + throw new IOException("zero read at file '%s' offset: %d".format(file, position)) + } + if( count < 0 ) { + throw new EOFException("File '%s' offset: %d".format(file, position)) + } + position += count } data }