mirror of https://github.com/apache/activemq.git
leveldb store: Add more log traces for when we need to get more details on what's going on.
This commit is contained in:
parent
b1d8cbe4cd
commit
a907fc9e94
|
@ -602,7 +602,7 @@ class LevelDBClient(store: LevelDBStore) {
|
||||||
Some(this.getClass.getClassLoader.loadClass(name).newInstance().asInstanceOf[DBFactory])
|
Some(this.getClass.getClassLoader.loadClass(name).newInstance().asInstanceOf[DBFactory])
|
||||||
} catch {
|
} catch {
|
||||||
case e:Throwable =>
|
case e:Throwable =>
|
||||||
debug(e, "Could not load factory: "+name+" due to: "+e)
|
debug("Could not load factory: "+name+" due to: "+e)
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}.headOption.getOrElse(throw new Exception("Could not load any of the index factory classes: "+factoryNames))
|
}.headOption.getOrElse(throw new Exception("Could not load any of the index factory classes: "+factoryNames))
|
||||||
|
@ -822,7 +822,7 @@ class LevelDBClient(store: LevelDBStore) {
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
recoveryLogs = null
|
recoveryLogs = null
|
||||||
debug("Replay of journal done")
|
debug("Replay end")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -313,7 +313,7 @@ case class RecordLog(directory: File, logSuffix:String) {
|
||||||
val prefix = is.readByte()
|
val prefix = is.readByte()
|
||||||
if( prefix != LOG_HEADER_PREFIX ) {
|
if( prefix != LOG_HEADER_PREFIX ) {
|
||||||
// Does not look like a record.
|
// Does not look like a record.
|
||||||
throw new IOException("invalid record position %d (file: %s, offset: %d)".format(record_position, file.getName, offset))
|
throw new IOException("invalid record position %d (file: %s, offset: %d)".format(record_position, file.getAbsolutePath, offset))
|
||||||
}
|
}
|
||||||
val id = is.readByte()
|
val id = is.readByte()
|
||||||
val expectedChecksum = is.readInt()
|
val expectedChecksum = is.readInt()
|
||||||
|
|
|
@ -236,6 +236,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
|
||||||
if( login == null || slave_state == null) {
|
if( login == null || slave_state == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
trace("%s: Got WAL ack, position: %d, from: %s", directory, req.position, slave_state.slave_id)
|
||||||
slave_state.position_update(req.position)
|
slave_state.position_update(req.position)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,6 +399,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
|
||||||
value.date = date
|
value.date = date
|
||||||
wal_date = value.date;
|
wal_date = value.date;
|
||||||
value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
|
value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
|
||||||
|
trace("%s: Sending WAL update: (file:%d, offset: %d, length: %d)", directory, value.file, value.offset, value.length)
|
||||||
val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
|
val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
|
||||||
val frame2 = FileTransferFrame(file, offset, length)
|
val frame2 = FileTransferFrame(file, offset, length)
|
||||||
for( slave <- slaves.values() ) {
|
for( slave <- slaves.values() ) {
|
||||||
|
|
|
@ -178,14 +178,15 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
|
||||||
if( caughtUp && value.offset ==0 && value.file!=0 ) {
|
if( caughtUp && value.offset ==0 && value.file!=0 ) {
|
||||||
client.log.rotate
|
client.log.rotate
|
||||||
}
|
}
|
||||||
|
trace("%s, Slave WAL update: (file:%s, offset: %d, length: %d)".format(directory, value.file.toHexString, value.offset, value.length))
|
||||||
val file = client.log.next_log(value.file)
|
val file = client.log.next_log(value.file)
|
||||||
val buffer = map(file, value.offset, value.length, false)
|
val buffer = map(file, value.offset, value.length, false)
|
||||||
session.codec.readData(buffer, ^{
|
session.codec.readData(buffer, ^{
|
||||||
if( value.sync ) {
|
if( value.sync ) {
|
||||||
buffer.force()
|
buffer.force()
|
||||||
}
|
}
|
||||||
|
|
||||||
unmap(buffer)
|
unmap(buffer)
|
||||||
// info("Slave WAL update: %s, (offset: %d, length: %d), sending ack:%s", file, value.offset, value.length, caughtUp)
|
|
||||||
wal_append_offset = value.offset+value.length
|
wal_append_offset = value.offset+value.length
|
||||||
wal_append_position = value.file + wal_append_offset
|
wal_append_position = value.file + wal_append_offset
|
||||||
wal_date = value.date
|
wal_date = value.date
|
||||||
|
@ -296,7 +297,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
|
||||||
transport.setDispatchQueue(queue)
|
transport.setDispatchQueue(queue)
|
||||||
transport.connecting(new URI(connect), null)
|
transport.connecting(new URI(connect), null)
|
||||||
|
|
||||||
debug("Connecting download session.")
|
debug("%s: Connecting download session. Snapshot index at: %s".format(directory, state.snapshot_position.toHexString))
|
||||||
transfer_session = new Session(transport, (session)=> {
|
transfer_session = new Session(transport, (session)=> {
|
||||||
|
|
||||||
var total_files = 0
|
var total_files = 0
|
||||||
|
@ -360,6 +361,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
|
||||||
val buffer = map(target_file, 0, x.length, false)
|
val buffer = map(target_file, 0, x.length, false)
|
||||||
session.codec.readData(buffer, ^{
|
session.codec.readData(buffer, ^{
|
||||||
unmap(buffer)
|
unmap(buffer)
|
||||||
|
trace("%s, Downloaded %s, offset:%d, length:%d", directory, transfer.file, transfer.offset, transfer.length)
|
||||||
downloaded_size += x.length
|
downloaded_size += x.length
|
||||||
downloaded_files += 1
|
downloaded_files += 1
|
||||||
update_download_status
|
update_download_status
|
||||||
|
@ -384,6 +386,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
|
||||||
val buffer = map(dirty_index / x.file, 0, x.length, false)
|
val buffer = map(dirty_index / x.file, 0, x.length, false)
|
||||||
session.codec.readData(buffer, ^{
|
session.codec.readData(buffer, ^{
|
||||||
unmap(buffer)
|
unmap(buffer)
|
||||||
|
trace("%s, Downloaded %s, offset:%d, length:%d", directory, transfer.file, transfer.offset, transfer.length)
|
||||||
downloaded_size += x.length
|
downloaded_size += x.length
|
||||||
downloaded_files += 1
|
downloaded_files += 1
|
||||||
update_download_status
|
update_download_status
|
||||||
|
@ -405,6 +408,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
|
||||||
}
|
}
|
||||||
client.writeExecutor {
|
client.writeExecutor {
|
||||||
if( !state.index_files.isEmpty ) {
|
if( !state.index_files.isEmpty ) {
|
||||||
|
trace("%s: Index sync complete, copying to snapshot.", directory)
|
||||||
client.copyDirtyIndexToSnapshot(state.wal_append_position)
|
client.copyDirtyIndexToSnapshot(state.wal_append_position)
|
||||||
}
|
}
|
||||||
client.replay_init()
|
client.replay_init()
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
#
|
#
|
||||||
log4j.rootLogger=WARN, console, file
|
log4j.rootLogger=WARN, console, file
|
||||||
log4j.logger.org.apache.activemq=INFO
|
log4j.logger.org.apache.activemq=INFO
|
||||||
log4j.logger.org.fusesource=INFO
|
log4j.logger.org.apache.activemq.leveldb=INFO
|
||||||
|
|
||||||
# Console will only display warnnings
|
# Console will only display warnnings
|
||||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||||
|
|
Loading…
Reference in New Issue