git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@881270 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-11-17 13:19:35 +00:00
parent 0dc6e0c622
commit a55efe952c
4 changed files with 11 additions and 52 deletions

View File

@ -21,7 +21,6 @@ import java.io.InterruptedIOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayOutputStream; import org.apache.activemq.util.DataByteArrayOutputStream;
@ -49,7 +48,7 @@ class DataFileAppender {
protected final CountDownLatch shutdownDone = new CountDownLatch(1); protected final CountDownLatch shutdownDone = new CountDownLatch(1);
protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE; protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
protected boolean running; private boolean running;
private Thread thread; private Thread thread;
public static class WriteKey { public static class WriteKey {
@ -83,7 +82,6 @@ class DataFileAppender {
public final WriteCommand first; public final WriteCommand first;
public final CountDownLatch latch = new CountDownLatch(1); public final CountDownLatch latch = new CountDownLatch(1);
public int size; public int size;
public AtomicReference<IOException> exception = new AtomicReference<IOException>();
public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException { public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
this.dataFile = dataFile; this.dataFile = dataFile;
@ -181,10 +179,6 @@ class DataFileAppender {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
IOException exception = batch.exception.get();
if (exception != null) {
throw exception;
}
} }
return location; return location;
@ -222,6 +216,9 @@ class DataFileAppender {
if (shutdown) { if (shutdown) {
throw new IOException("Async Writter Thread Shutdown"); throw new IOException("Async Writter Thread Shutdown");
} }
if (firstAsyncException != null) {
throw firstAsyncException;
}
if (!running) { if (!running) {
running = true; running = true;
@ -234,11 +231,6 @@ class DataFileAppender {
thread.setDaemon(true); thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer"); thread.setName("ActiveMQ Data File Writer");
thread.start(); thread.start();
firstAsyncException = null;
}
if (firstAsyncException != null) {
throw firstAsyncException;
} }
if (nextWriteBatch == null) { if (nextWriteBatch == null) {
@ -306,7 +298,6 @@ class DataFileAppender {
protected void processQueue() { protected void processQueue() {
DataFile dataFile = null; DataFile dataFile = null;
RandomAccessFile file = null; RandomAccessFile file = null;
WriteBatch wb = null;
try { try {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize); DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@ -330,7 +321,7 @@ class DataFileAppender {
enqueueMutex.notify(); enqueueMutex.notify();
} }
wb = (WriteBatch)o; WriteBatch wb = (WriteBatch)o;
if (dataFile != wb.dataFile) { if (dataFile != wb.dataFile) {
if (file != null) { if (file != null) {
dataFile.closeRandomAccessFile(file); dataFile.closeRandomAccessFile(file);
@ -415,14 +406,6 @@ class DataFileAppender {
} catch (IOException e) { } catch (IOException e) {
synchronized (enqueueMutex) { synchronized (enqueueMutex) {
firstAsyncException = e; firstAsyncException = e;
if (wb != null) {
wb.latch.countDown();
wb.exception.set(e);
}
if (nextWriteBatch != null) {
nextWriteBatch.latch.countDown();
nextWriteBatch.exception.set(e);
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
} finally { } finally {

View File

@ -47,7 +47,6 @@ class NIODataFileAppender extends DataFileAppender {
DataFile dataFile = null; DataFile dataFile = null;
RandomAccessFile file = null; RandomAccessFile file = null;
FileChannel channel = null; FileChannel channel = null;
WriteBatch wb = null;
try { try {
@ -82,14 +81,13 @@ class NIODataFileAppender extends DataFileAppender {
enqueueMutex.notify(); enqueueMutex.notify();
} }
wb = (WriteBatch)o; WriteBatch wb = (WriteBatch)o;
if (dataFile != wb.dataFile) { if (dataFile != wb.dataFile) {
if (file != null) { if (file != null) {
dataFile.closeRandomAccessFile(file); dataFile.closeRandomAccessFile(file);
} }
dataFile = wb.dataFile; dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile(true); file = dataFile.openRandomAccessFile(true);
System.out.println("new channel?");
channel = file.getChannel(); channel = file.getChannel();
} }
@ -182,33 +180,16 @@ class NIODataFileAppender extends DataFileAppender {
} catch (IOException e) { } catch (IOException e) {
synchronized (enqueueMutex) { synchronized (enqueueMutex) {
firstAsyncException = e; firstAsyncException = e;
if (wb != null) {
wb.latch.countDown();
wb.exception.set(e);
}
if (nextWriteBatch != null) {
nextWriteBatch.latch.countDown();
nextWriteBatch.exception.set(e);
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
} finally { } finally {
try { try {
if (file != null) { if (file != null) {
dataFile.closeRandomAccessFile(file); dataFile.closeRandomAccessFile(file);
dataFile = null;
file.close();
file = null;
}
if (channel != null) {
System.out.println("Closing channel");
channel.close();
channel = null;
} }
} catch (IOException e) { } catch (IOException e) {
} }
shutdownDone.countDown(); shutdownDone.countDown();
running = false;
} }
} }

View File

@ -262,28 +262,23 @@ public final class IntrospectionSupport {
boolean first = true; boolean first = true;
for (Iterator iter = entrySet.iterator(); iter.hasNext();) { for (Iterator iter = entrySet.iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry)iter.next(); Map.Entry entry = (Map.Entry)iter.next();
Object value = entry.getValue();
Object key = entry.getKey();
if (first) { if (first) {
first = false; first = false;
} else { } else {
buffer.append(", "); buffer.append(", ");
} }
buffer.append(key); buffer.append(entry.getKey());
buffer.append(" = "); buffer.append(" = ");
appendToString(buffer, entry.getValue());
appendToString(buffer, key, value);
} }
buffer.append("}"); buffer.append("}");
return buffer.toString(); return buffer.toString();
} }
protected static void appendToString(StringBuffer buffer, Object key, Object value) { protected static void appendToString(StringBuffer buffer, Object value) {
if (value instanceof ActiveMQDestination) { if (value instanceof ActiveMQDestination) {
ActiveMQDestination destination = (ActiveMQDestination)value; ActiveMQDestination destination = (ActiveMQDestination)value;
buffer.append(destination.getQualifiedName()); buffer.append(destination.getQualifiedName());
} else if (key.toString().contains("password")){
buffer.append("*****");
} else { } else {
buffer.append(value); buffer.append(value);
} }

View File

@ -18,7 +18,7 @@
# #
# The logging properties used during tests.. # The logging properties used during tests..
# #
log4j.rootLogger=DEBUG, out, stdout log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq=DEBUG #log4j.logger.org.apache.activemq=DEBUG