ARTEMIS-4676 use ActionContext consistently for logging in CLI commands
This commit is contained in:
parent
fd1ef367d3
commit
7742936583
|
@ -52,9 +52,9 @@ public abstract class Configurable extends ActionAbstract {
|
|||
|
||||
protected void treatError(Exception e, String group, String command) {
|
||||
logger.debug(e.getMessage(), e);
|
||||
System.err.println();
|
||||
System.err.println("Error:" + e.getMessage());
|
||||
System.err.println();
|
||||
getActionContext().err.println();
|
||||
getActionContext().err.println("Error:" + e.getMessage());
|
||||
getActionContext().err.println();
|
||||
|
||||
if (!(e instanceof ActiveMQException)) {
|
||||
e.printStackTrace();
|
||||
|
|
|
@ -29,9 +29,9 @@ public class Connect extends ConnectionAbstract {
|
|||
try {
|
||||
CONNECTION_INFORMATION.remove();
|
||||
createConnectionFactory();
|
||||
System.out.println("Connection Successful!");
|
||||
context.out.println("Connection Successful!");
|
||||
} catch (Exception e) {
|
||||
System.out.println("Connection Failure!");
|
||||
context.out.println("Connection Failure!");
|
||||
e.printStackTrace();
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -994,7 +994,7 @@ public class Create extends InstallAbstract {
|
|||
RoutingType.valueOf(routingType.toUpperCase());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
System.err.println("Invalid routing type: " + routingType);
|
||||
getActionContext().err.println("Invalid routing type: " + routingType);
|
||||
}
|
||||
printWriter.println(" <address name=\"" + name + "\">");
|
||||
printWriter.println(" <" + routingType + ">");
|
||||
|
@ -1011,7 +1011,7 @@ public class Create extends InstallAbstract {
|
|||
RoutingType.valueOf(routingType.toUpperCase());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
System.err.println("Invalid routing type: " + routingType);
|
||||
getActionContext().err.println("Invalid routing type: " + routingType);
|
||||
}
|
||||
printWriter.println(" <address name=\"" + name + "\">");
|
||||
printWriter.println(" <" + routingType + "/>");
|
||||
|
@ -1042,8 +1042,8 @@ public class Create extends InstallAbstract {
|
|||
|
||||
filters.put("${page-sync.settings}", readTextFile(ETC_PAGE_SYNC_SETTINGS, syncFilter));
|
||||
} else {
|
||||
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), journalType);
|
||||
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
|
||||
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), journalType, getActionContext());
|
||||
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose, getActionContext());
|
||||
double writesPerMillisecond = (double) writes / (double) time;
|
||||
|
||||
String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
|
||||
|
@ -1061,8 +1061,8 @@ public class Create extends InstallAbstract {
|
|||
if (noJournalSync) {
|
||||
syncFilter.put("${nanoseconds}", "0");
|
||||
} else if (journalType != JournalType.NIO) {
|
||||
long nioTime = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), JournalType.NIO);
|
||||
long nioNanoseconds = SyncCalculation.toNanos(nioTime, writes, verbose);
|
||||
long nioTime = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), JournalType.NIO, getActionContext());
|
||||
long nioNanoseconds = SyncCalculation.toNanos(nioTime, writes, verbose, getActionContext());
|
||||
syncFilter.put("${nanoseconds}", Long.toString(nioNanoseconds));
|
||||
}
|
||||
|
||||
|
@ -1075,7 +1075,7 @@ public class Create extends InstallAbstract {
|
|||
filters.put("${journal-buffer.settings}", "");
|
||||
filters.put("${page-sync.settings}", "");
|
||||
e.printStackTrace();
|
||||
System.err.println("Couldn't perform sync calculation, using default values");
|
||||
getActionContext().err.println("Couldn't perform sync calculation, using default values");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1096,7 +1096,7 @@ public class Create extends InstallAbstract {
|
|||
}
|
||||
tmpFile.delete();
|
||||
if (!supportsLibaio) {
|
||||
System.err.println("The filesystem used on " + directory + " doesn't support libAIO and O_DIRECT files, switching journal-type to NIO");
|
||||
getActionContext().err.println("The filesystem used on " + directory + " doesn't support libAIO and O_DIRECT files, switching journal-type to NIO");
|
||||
}
|
||||
return supportsLibaio;
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ public class Disconnect extends ConnectionAbstract {
|
|||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
CONNECTION_INFORMATION.remove();
|
||||
System.out.println("Connection information cleared!");
|
||||
context.out.println("Connection information cleared!");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,16 +26,16 @@ public class PWD extends ActionAbstract {
|
|||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
|
||||
System.out.println();
|
||||
System.out.println("*******************************************************************************************************************************");
|
||||
System.out.println("* Artemis instance:: " + getBrokerInstance());
|
||||
System.out.println("* Home:: " + getBrokerHome());
|
||||
System.out.println("* etc:: " + getBrokerEtc());
|
||||
context.out.println();
|
||||
context.out.println("*******************************************************************************************************************************");
|
||||
context.out.println("* Artemis instance:: " + getBrokerInstance());
|
||||
context.out.println("* Home:: " + getBrokerHome());
|
||||
context.out.println("* etc:: " + getBrokerEtc());
|
||||
|
||||
String canonicalPath = new java.io.File(".").getCanonicalPath();
|
||||
System.out.println("* Current dir:" + canonicalPath);
|
||||
System.out.println("*******************************************************************************************************************************");
|
||||
System.out.println();
|
||||
context.out.println("* Current dir:" + canonicalPath);
|
||||
context.out.println("*******************************************************************************************************************************");
|
||||
context.out.println();
|
||||
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -195,7 +195,7 @@ public class Run extends LockAbstract {
|
|||
public void run() {
|
||||
if (allowKill && fileKill.exists()) {
|
||||
try {
|
||||
System.err.println("Halting by user request");
|
||||
getActionContext().err.println("Halting by user request");
|
||||
fileKill.delete();
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
|
@ -243,15 +243,15 @@ public class Run extends LockAbstract {
|
|||
}
|
||||
|
||||
|
||||
public static void verifyOlderLogging(File etc) throws Exception {
|
||||
public void verifyOlderLogging(File etc) {
|
||||
File newLogging = new File(etc, Create.ETC_LOG4J2_PROPERTIES);
|
||||
File oldLogging = new File(etc, Upgrade.OLD_LOGGING_PROPERTIES);
|
||||
|
||||
if (oldLogging.exists() && !newLogging.exists()) {
|
||||
System.out.println("******************************************************************************************************************************************************************************");
|
||||
System.out.println("Your system has the older logging file " + Upgrade.OLD_LOGGING_PROPERTIES + ", but not the new " + Create.ETC_LOG4J2_PROPERTIES);
|
||||
System.out.println("It appears you did not complete the migration on this artemis instance properly. Please check all the settings or run the './artemis upgrade' command from the new artemis home");
|
||||
System.out.println("******************************************************************************************************************************************************************************");
|
||||
getActionContext().out.println("******************************************************************************************************************************************************************************");
|
||||
getActionContext().out.println("Your system has the older logging file " + Upgrade.OLD_LOGGING_PROPERTIES + ", but not the new " + Create.ETC_LOG4J2_PROPERTIES);
|
||||
getActionContext().out.println("It appears you did not complete the migration on this artemis instance properly. Please check all the settings or run the './artemis upgrade' command from the new artemis home");
|
||||
getActionContext().out.println("******************************************************************************************************************************************************************************");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ public class Browse extends DestAbstract {
|
|||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
Destination dest = getDestination(session);
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i, context);
|
||||
|
||||
threadsArray[i]
|
||||
.setVerbose(verbose)
|
||||
|
|
|
@ -234,7 +234,7 @@ public class ConnectionAbstract extends InputAbstract {
|
|||
void saveConnectionInfo(String brokerURL, String user, String password) {
|
||||
if (Shell.inShell() && CONNECTION_INFORMATION.get() == null) {
|
||||
CONNECTION_INFORMATION.set(new ConnectionInformation(brokerURL, user, password));
|
||||
System.out.println("CLI connected to broker " + brokerURL + ", user:" + user);
|
||||
getActionContext().out.println("CLI connected to broker " + brokerURL + ", user:" + user);
|
||||
this.brokerURL = brokerURL;
|
||||
this.user = user;
|
||||
this.password = password;
|
||||
|
|
|
@ -63,14 +63,14 @@ public class Consumer extends DestAbstract {
|
|||
if (file != null) {
|
||||
serializer = getMessageSerializer();
|
||||
if (serializer == null) {
|
||||
System.err.println("Error. Unable to instantiate serializer class: " + this.serializer);
|
||||
context.err.println("Error. Unable to instantiate serializer class: " + this.serializer);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
outputStream = new BufferedOutputStream(new FileOutputStream(file));
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error: Unable to open file for writing\n" + e.getMessage());
|
||||
context.err.println("Error: Unable to open file for writing\n" + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -92,7 +92,7 @@ public class Consumer extends DestAbstract {
|
|||
}
|
||||
|
||||
Destination dest = getDestination(session);
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i, context);
|
||||
|
||||
threadsArray[i]
|
||||
.setVerbose(verbose)
|
||||
|
|
|
@ -31,12 +31,15 @@ import javax.jms.Topic;
|
|||
import java.util.Enumeration;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
|
||||
public class ConsumerThread extends Thread {
|
||||
|
||||
long messageCount = 1000;
|
||||
int receiveTimeOut = 3000;
|
||||
Destination destination;
|
||||
Session session;
|
||||
ActionContext context;
|
||||
boolean durable;
|
||||
boolean breakOnNull = true;
|
||||
int sleep;
|
||||
|
@ -53,10 +56,11 @@ public class ConsumerThread extends Thread {
|
|||
boolean bytesAsText;
|
||||
MessageListener listener;
|
||||
|
||||
public ConsumerThread(Session session, Destination destination, int threadNr) {
|
||||
public ConsumerThread(Session session, Destination destination, int threadNr, ActionContext context) {
|
||||
super("Consumer " + destination.toString() + ", thread=" + threadNr);
|
||||
this.destination = destination;
|
||||
this.session = session;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -74,32 +78,32 @@ public class ConsumerThread extends Thread {
|
|||
} else {
|
||||
if (browse) {
|
||||
if (verbose) {
|
||||
System.out.println("..." + msg);
|
||||
context.out.println("..." + msg);
|
||||
}
|
||||
if (bytesAsText && (msg instanceof BytesMessage)) {
|
||||
long length = ((BytesMessage) msg).getBodyLength();
|
||||
byte[] bytes = new byte[(int) length];
|
||||
((BytesMessage) msg).readBytes(bytes);
|
||||
System.out.println("Message:" + msg);
|
||||
context.out.println("Message:" + msg);
|
||||
}
|
||||
} else {
|
||||
if (verbose) {
|
||||
System.out.println("JMS Message ID:" + msg.getJMSMessageID());
|
||||
context.out.println("JMS Message ID:" + msg.getJMSMessageID());
|
||||
if (bytesAsText && (msg instanceof BytesMessage)) {
|
||||
long length = ((BytesMessage) msg).getBodyLength();
|
||||
byte[] bytes = new byte[(int) length];
|
||||
((BytesMessage) msg).readBytes(bytes);
|
||||
System.out.println("Received a message with " + bytes.length);
|
||||
context.out.println("Received a message with " + bytes.length);
|
||||
}
|
||||
|
||||
if (msg instanceof TextMessage) {
|
||||
String text = ((TextMessage) msg).getText();
|
||||
System.out.println("Received text sized at " + text.length());
|
||||
context.out.println("Received text sized at " + text.length());
|
||||
}
|
||||
|
||||
if (msg instanceof ObjectMessage) {
|
||||
Object obj = ((ObjectMessage) msg).getObject();
|
||||
System.out.println("Received object " + obj.toString().length());
|
||||
context.out.println("Received object " + obj.toString().length());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -110,7 +114,7 @@ public class ConsumerThread extends Thread {
|
|||
running = true;
|
||||
QueueBrowser consumer = null;
|
||||
String threadName = Thread.currentThread().getName();
|
||||
System.out.println(threadName + " trying to browse " + messageCount + " messages");
|
||||
context.out.println(threadName + " trying to browse " + messageCount + " messages");
|
||||
try {
|
||||
if (filter != null) {
|
||||
consumer = session.createBrowser((Queue) destination, filter);
|
||||
|
@ -122,7 +126,7 @@ public class ConsumerThread extends Thread {
|
|||
while (enumBrowse.hasMoreElements()) {
|
||||
Message msg = enumBrowse.nextElement();
|
||||
if (msg != null) {
|
||||
System.out.println(threadName + " browsing " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||
context.out.println(threadName + " browsing " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||
handle(msg, true);
|
||||
received++;
|
||||
|
||||
|
@ -147,7 +151,7 @@ public class ConsumerThread extends Thread {
|
|||
finished.countDown();
|
||||
}
|
||||
if (consumer != null) {
|
||||
System.out.println(threadName + " browsed: " + this.getReceived() + " messages");
|
||||
context.out.println(threadName + " browsed: " + this.getReceived() + " messages");
|
||||
try {
|
||||
consumer.close();
|
||||
} catch (JMSException e) {
|
||||
|
@ -156,14 +160,14 @@ public class ConsumerThread extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
System.out.println(threadName + " Browser thread finished");
|
||||
context.out.println(threadName + " Browser thread finished");
|
||||
}
|
||||
|
||||
public void consume() {
|
||||
running = true;
|
||||
MessageConsumer consumer = null;
|
||||
String threadName = Thread.currentThread().getName();
|
||||
System.out.println(threadName + " wait until " + messageCount + " messages are consumed");
|
||||
context.out.println(threadName + " wait until " + messageCount + " messages are consumed");
|
||||
try {
|
||||
if (durable && destination instanceof Topic) {
|
||||
if (filter != null) {
|
||||
|
@ -184,10 +188,10 @@ public class ConsumerThread extends Thread {
|
|||
Message msg = consumer.receive(receiveTimeOut);
|
||||
if (msg != null) {
|
||||
if (verbose) {
|
||||
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||
context.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||
} else {
|
||||
if (++count % 1000 == 0) {
|
||||
System.out.println("Received " + count);
|
||||
context.out.println("Received " + count);
|
||||
}
|
||||
}
|
||||
handle(msg, false);
|
||||
|
@ -200,12 +204,12 @@ public class ConsumerThread extends Thread {
|
|||
|
||||
if (session.getTransacted()) {
|
||||
if (batchSize > 0 && received > 0 && received % batchSize == 0) {
|
||||
System.out.println(threadName + " Committing transaction: " + transactions++);
|
||||
context.out.println(threadName + " Committing transaction: " + transactions++);
|
||||
session.commit();
|
||||
}
|
||||
} else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE && msg != null) {
|
||||
if (batchSize > 0 && received > 0 && received % batchSize == 0) {
|
||||
System.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received);
|
||||
context.out.println("Acknowledging last " + batchSize + " messages; messages so far = " + received);
|
||||
msg.acknowledge();
|
||||
}
|
||||
}
|
||||
|
@ -221,11 +225,11 @@ public class ConsumerThread extends Thread {
|
|||
}
|
||||
|
||||
|
||||
System.out.println(threadName + " Consumed: " + this.getMessageCount() + " messages");
|
||||
context.out.println(threadName + " Consumed: " + this.getMessageCount() + " messages");
|
||||
long tEnd = System.currentTimeMillis();
|
||||
long elapsed = (tEnd - tStart) / 1000;
|
||||
System.out.println(threadName + " Elapsed time in second : " + elapsed + " s");
|
||||
System.out.println(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
|
||||
context.out.println(threadName + " Elapsed time in second : " + elapsed + " s");
|
||||
context.out.println(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
@ -234,7 +238,7 @@ public class ConsumerThread extends Thread {
|
|||
finished.countDown();
|
||||
}
|
||||
if (consumer != null) {
|
||||
System.out.println(threadName + " Consumed: " + this.getReceived() + " messages");
|
||||
context.out.println(threadName + " Consumed: " + this.getReceived() + " messages");
|
||||
try {
|
||||
consumer.close();
|
||||
} catch (JMSException e) {
|
||||
|
@ -243,7 +247,7 @@ public class ConsumerThread extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
System.out.println(threadName + " Consumer thread finished");
|
||||
context.out.println(threadName + " Consumer thread finished");
|
||||
}
|
||||
|
||||
public int getReceived() {
|
||||
|
|
|
@ -55,13 +55,13 @@ public class DestAbstract extends ConnectionAbstract {
|
|||
try {
|
||||
return (MessageSerializer) Class.forName(serializer).getConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error: unable to instantiate serializer class: " + serializer);
|
||||
System.err.println("Defaulting to: " + XMLMessageSerializer.class.getName());
|
||||
getActionContext().err.println("Error: unable to instantiate serializer class: " + serializer);
|
||||
getActionContext().err.println("Defaulting to: " + XMLMessageSerializer.class.getName());
|
||||
}
|
||||
}
|
||||
|
||||
if (protocol != ConnectionProtocol.CORE) {
|
||||
System.err.println("Default Serializer does not support: " + protocol + " protocol");
|
||||
getActionContext().err.println("Default Serializer does not support: " + protocol + " protocol");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -153,7 +153,7 @@ public class Producer extends DestAbstract {
|
|||
try {
|
||||
MessageSerializer serializer = getMessageSerializer();
|
||||
if (serializer == null) {
|
||||
System.err.println("Error. Unable to instantiate serializer class: " + serializer);
|
||||
context.err.println("Error. Unable to instantiate serializer class: " + serializer);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -161,7 +161,7 @@ public class Producer extends DestAbstract {
|
|||
try {
|
||||
in = new FileInputStream(file);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error: Unable to open file for reading\n" + e.getMessage());
|
||||
context.err.println("Error: Unable to open file for reading\n" + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -179,12 +179,12 @@ public class Producer extends DestAbstract {
|
|||
session.commit();
|
||||
serializer.stop();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error occurred during import. Rolling back.");
|
||||
context.err.println("Error occurred during import. Rolling back.");
|
||||
session.rollback();
|
||||
e.printStackTrace();
|
||||
return 0;
|
||||
}
|
||||
System.out.println("Sent " + messageCount + " Messages.");
|
||||
context.out.println("Sent " + messageCount + " Messages.");
|
||||
return messageCount;
|
||||
} else {
|
||||
ProducerThread[] threadsArray = new ProducerThread[threads];
|
||||
|
@ -196,7 +196,7 @@ public class Producer extends DestAbstract {
|
|||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
Destination dest = getDestination(session);
|
||||
threadsArray[i] = new ProducerThread(session, dest, i);
|
||||
threadsArray[i] = new ProducerThread(session, dest, i, context);
|
||||
|
||||
threadsArray[i]
|
||||
.setVerbose(verbose)
|
||||
|
|
|
@ -30,11 +30,13 @@ import java.io.InputStreamReader;
|
|||
import java.net.URL;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
||||
public class ProducerThread extends Thread {
|
||||
|
||||
protected final Session session;
|
||||
protected final ActionContext context;
|
||||
|
||||
boolean verbose;
|
||||
long messageCount = 1000;
|
||||
|
@ -59,10 +61,11 @@ public class ProducerThread extends Thread {
|
|||
final ReusableLatch finished = new ReusableLatch(1);
|
||||
final ReusableLatch paused = new ReusableLatch(0);
|
||||
|
||||
public ProducerThread(Session session, Destination destination, int threadNr) {
|
||||
public ProducerThread(Session session, Destination destination, int threadNr, ActionContext context) {
|
||||
super("Producer " + destination.toString() + ", thread=" + threadNr);
|
||||
this.destination = destination;
|
||||
this.session = session;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -76,7 +79,7 @@ public class ProducerThread extends Thread {
|
|||
initPayLoad();
|
||||
running = true;
|
||||
|
||||
System.out.println(threadName + " Started to calculate elapsed time ...\n");
|
||||
context.out.println(threadName + " Started to calculate elapsed time ...\n");
|
||||
long tStart = System.currentTimeMillis();
|
||||
|
||||
if (runIndefinitely) {
|
||||
|
@ -97,11 +100,11 @@ public class ProducerThread extends Thread {
|
|||
} catch (Throwable ignored) {
|
||||
}
|
||||
|
||||
System.out.println(threadName + " Produced: " + this.getSentCount() + " messages");
|
||||
context.out.println(threadName + " Produced: " + this.getSentCount() + " messages");
|
||||
long tEnd = System.currentTimeMillis();
|
||||
long elapsed = (tEnd - tStart) / 1000;
|
||||
System.out.println(threadName + " Elapsed time in second : " + elapsed + " s");
|
||||
System.out.println(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
|
||||
context.out.println(threadName + " Elapsed time in second : " + elapsed + " s");
|
||||
context.out.println(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
@ -124,11 +127,11 @@ public class ProducerThread extends Thread {
|
|||
|
||||
producer.send(message);
|
||||
if (verbose) {
|
||||
System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
|
||||
context.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
|
||||
}
|
||||
|
||||
if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0) {
|
||||
System.out.println(threadName + " Committing transaction: " + transactions++);
|
||||
context.out.println(threadName + " Committing transaction: " + transactions++);
|
||||
session.commit();
|
||||
}
|
||||
|
||||
|
|
|
@ -335,7 +335,7 @@ public class Transfer extends InputAbstract {
|
|||
}
|
||||
}
|
||||
|
||||
System.out.println("Connection brokerURL = " + sourceURL);
|
||||
context.out.println("Connection brokerURL = " + sourceURL);
|
||||
|
||||
ConnectionFactory sourceConnectionFactory = createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, sourcePassword, sourceClientID);
|
||||
Connection sourceConnection = sourceConnectionFactory.createConnection();
|
||||
|
@ -383,7 +383,7 @@ public class Transfer extends InputAbstract {
|
|||
MessageProducer producer = targetSession.createProducer(targetDestination);
|
||||
|
||||
if (sourceURL.equals(targetURL) && sourceDestination.equals(targetDestination)) {
|
||||
System.out.println("You cannot transfer between " + sourceURL + "/" + sourceDestination + " and " + targetURL + "/" + targetDestination + ".\n" + "That would create an infinite recursion.");
|
||||
context.out.println("You cannot transfer between " + sourceURL + "/" + sourceDestination + " and " + targetURL + "/" + targetDestination + ".\n" + "That would create an infinite recursion.");
|
||||
throw new IllegalArgumentException("cannot use " + sourceDestination + " == " + targetDestination);
|
||||
}
|
||||
|
||||
|
@ -402,7 +402,7 @@ public class Transfer extends InputAbstract {
|
|||
|
||||
if (receivedMessage == null) {
|
||||
if (isVerbose()) {
|
||||
System.out.println("could not receive any more messages");
|
||||
context.out.println("could not receive any more messages");
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -411,10 +411,10 @@ public class Transfer extends InputAbstract {
|
|||
total++;
|
||||
|
||||
if (isVerbose()) {
|
||||
System.out.println("Received message " + total + " with " + pending + " messages pending to be commited");
|
||||
context.out.println("Received message " + total + " with " + pending + " messages pending to be commited");
|
||||
}
|
||||
if (pending > commitInterval) {
|
||||
System.out.println("Transferred " + pending + " messages of " + total);
|
||||
context.out.println("Transferred " + pending + " messages of " + total);
|
||||
pending = 0;
|
||||
targetSession.commit();
|
||||
if (!isCopy()) {
|
||||
|
@ -423,7 +423,7 @@ public class Transfer extends InputAbstract {
|
|||
}
|
||||
}
|
||||
|
||||
System.out.println("Transferred a total of " + total + " messages");
|
||||
context.out.println("Transferred a total of " + total + " messages");
|
||||
|
||||
if (pending != 0) {
|
||||
targetSession.commit();
|
||||
|
@ -464,12 +464,12 @@ public class Transfer extends InputAbstract {
|
|||
String clientID) throws Exception {
|
||||
if (protocol.equals("core")) {
|
||||
if (isVerbose()) {
|
||||
System.out.println("Creating " + role + " CORE Connection towards " + brokerURL);
|
||||
getActionContext().out.println("Creating " + role + " CORE Connection towards " + brokerURL);
|
||||
}
|
||||
return createCoreConnectionFactory(brokerURL, user, password, clientID);
|
||||
} else if (protocol.equals("amqp")) {
|
||||
if (isVerbose()) {
|
||||
System.out.println("Creating " + role + " AMQP Connection towards " + brokerURL);
|
||||
getActionContext().out.println("Creating " + role + " AMQP Connection towards " + brokerURL);
|
||||
}
|
||||
return createAMQPConnectionFactory(brokerURL, user, password, clientID);
|
||||
} else {
|
||||
|
@ -523,7 +523,7 @@ public class Transfer extends InputAbstract {
|
|||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
|
||||
|
||||
if (clientID != null) {
|
||||
System.out.println("Consumer:: clientID = " + clientID);
|
||||
getActionContext().out.println("Consumer:: clientID = " + clientID);
|
||||
cf.setClientID(clientID);
|
||||
}
|
||||
try {
|
||||
|
@ -553,7 +553,7 @@ public class Transfer extends InputAbstract {
|
|||
}
|
||||
|
||||
Pair<String, String> userPassword(String uri) {
|
||||
System.out.println("Type in user/password towards " + uri);
|
||||
getActionContext().out.println("Type in user/password towards " + uri);
|
||||
String user, password;
|
||||
user = input("--user", "Type the username for a retry", null);
|
||||
password = inputPassword("--password", "Type the password for a retry", null);
|
||||
|
|
|
@ -72,7 +72,7 @@ public abstract class PerfCommand extends ConnectionAbstract {
|
|||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
if (txSize > 0) {
|
||||
System.out.println("--tx-size is deprecated, please use --commit-interval");
|
||||
context.out.println("--tx-size is deprecated, please use --commit-interval");
|
||||
commitInterval = txSize;
|
||||
}
|
||||
final ConnectionFactory factory = createConnectionFactory(brokerURL, user, password, null, protocol);
|
||||
|
|
|
@ -51,8 +51,8 @@ public abstract class LockAbstract extends DataAbstract {
|
|||
super.execute(context);
|
||||
|
||||
if (getBrokerInstance() == null) {
|
||||
System.err.println("Warning: You are running a data tool outside of any broker instance. Modifying data on a running server might break the server's data");
|
||||
System.err.println();
|
||||
context.err.println("Warning: You are running a data tool outside of any broker instance. Modifying data on a running server might break the server's data");
|
||||
context.err.println();
|
||||
} else {
|
||||
lockCLI(getLockPlace());
|
||||
}
|
||||
|
|
|
@ -75,10 +75,10 @@ public class PerfJournal extends OptionalLocking {
|
|||
fileConfiguration.setJournalType(JournalType.getType(journalType));
|
||||
}
|
||||
|
||||
System.out.println("");
|
||||
System.out.println("Auto tuning journal ...");
|
||||
context.out.println("");
|
||||
context.out.println("Auto tuning journal ...");
|
||||
|
||||
System.out.println("Performing " + tries + " tests writing " + writes + " blocks of " + size + " on each test, sync=" + fileConfiguration.isJournalDatasync() + " with journalType = " + fileConfiguration.getJournalType());
|
||||
context.out.println("Performing " + tries + " tests writing " + writes + " blocks of " + size + " on each test, sync=" + fileConfiguration.isJournalDatasync() + " with journalType = " + fileConfiguration.getJournalType());
|
||||
|
||||
fileConfiguration.getJournalLocation().mkdirs();
|
||||
|
||||
|
@ -86,9 +86,9 @@ public class PerfJournal extends OptionalLocking {
|
|||
maxAIO = fileConfiguration.getJournalMaxIO_AIO();
|
||||
}
|
||||
|
||||
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), syncWrites, fileName, maxAIO, fileConfiguration.getJournalType());
|
||||
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), syncWrites, fileName, maxAIO, fileConfiguration.getJournalType(), context);
|
||||
|
||||
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
|
||||
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose, context);
|
||||
double writesPerMillisecond = (double) writes / (double) time;
|
||||
|
||||
String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
|
||||
|
|
|
@ -374,7 +374,7 @@ public final class XmlDataExporter extends DBOption {
|
|||
xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
|
||||
|
||||
if (logInterval > 0) {
|
||||
System.err.println("Processing journal messages");
|
||||
getActionContext().err.println("Processing journal messages");
|
||||
}
|
||||
|
||||
long msgs = 0;
|
||||
|
@ -385,7 +385,7 @@ public final class XmlDataExporter extends DBOption {
|
|||
msgs++;
|
||||
if (logInterval > 0) {
|
||||
if (msgs % logInterval == 0) {
|
||||
System.err.println("exported " + msgs + " messages from journal");
|
||||
getActionContext().err.println("exported " + msgs + " messages from journal");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -433,7 +433,7 @@ public final class XmlDataExporter extends DBOption {
|
|||
while (iter.hasNext()) {
|
||||
msgs++;
|
||||
if (logInterval > 0 && msgs % logInterval == 0) {
|
||||
System.err.println("Exported " + msgs + " messages from paging");
|
||||
getActionContext().err.println("Exported " + msgs + " messages from paging");
|
||||
}
|
||||
PagedMessage message = iter.next();
|
||||
message.initMessage(storageManager);
|
||||
|
|
|
@ -299,7 +299,7 @@ public final class XmlDataImporter extends ActionAbstract {
|
|||
messageNr++;
|
||||
|
||||
if (messageNr % commitInterval == 0) {
|
||||
System.err.println("Processed " + messageNr + " messages");
|
||||
getActionContext().err.println("Processed " + messageNr + " messages");
|
||||
session.commit();
|
||||
}
|
||||
XMLMessageImporter.MessageInfo info = messageReader.readMessage(false);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.text.DecimalFormat;
|
|||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
|
@ -74,7 +75,8 @@ public class SyncCalculation {
|
|||
boolean syncWrites,
|
||||
String fileName,
|
||||
int maxAIO,
|
||||
JournalType journalType) throws Exception {
|
||||
JournalType journalType,
|
||||
ActionContext context) throws Exception {
|
||||
SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks, maxAIO);
|
||||
|
||||
if (factory instanceof AIOSequentialFileFactory) {
|
||||
|
@ -83,17 +85,17 @@ public class SyncCalculation {
|
|||
//the write latencies could be taken only when writes are effectively synchronous
|
||||
|
||||
if (journalType == JournalType.ASYNCIO && syncWrites) {
|
||||
System.out.println();
|
||||
System.out.println("*******************************************************************************************");
|
||||
System.out.println("*** Notice: The recommendation for AsyncIO journal is to not use --sync-writes ***");
|
||||
System.out.println("*** The measures here will be useful to understand your device ***");
|
||||
System.out.println("*** however the result here won't represent the best configuration option ***");
|
||||
System.out.println("*******************************************************************************************");
|
||||
System.out.println();
|
||||
context.out.println();
|
||||
context.out.println("*******************************************************************************************");
|
||||
context.out.println("*** Notice: The recommendation for AsyncIO journal is to not use --sync-writes ***");
|
||||
context.out.println("*** The measures here will be useful to understand your device ***");
|
||||
context.out.println("*** however the result here won't represent the best configuration option ***");
|
||||
context.out.println("*******************************************************************************************");
|
||||
context.out.println();
|
||||
}
|
||||
|
||||
if (verbose) {
|
||||
System.out.println("Using " + factory.getClass().getName() + " to calculate sync times, alignment=" + factory.getAlignment());
|
||||
context.out.println("Using " + factory.getClass().getName() + " to calculate sync times, alignment=" + factory.getAlignment());
|
||||
}
|
||||
SequentialFile file = factory.createSequentialFile(fileName);
|
||||
//to be sure that a process/thread crash won't leave the dataFolder with garbage files
|
||||
|
@ -131,8 +133,8 @@ public class SyncCalculation {
|
|||
for (int ntry = 0; ntry < tries; ntry++) {
|
||||
|
||||
if (verbose) {
|
||||
System.out.println("**************************************************");
|
||||
System.out.println(ntry + " of " + tries + " calculation");
|
||||
context.out.println("**************************************************");
|
||||
context.out.println(ntry + " of " + tries + " calculation");
|
||||
}
|
||||
file.open();
|
||||
file.position(0);
|
||||
|
@ -156,10 +158,10 @@ public class SyncCalculation {
|
|||
|
||||
if (verbose) {
|
||||
double writesPerMillisecond = (double) blocks / (double) result[ntry];
|
||||
System.out.println("Time = " + result[ntry] + " milliseconds");
|
||||
System.out.println("Writes / millisecond = " + dcformat.format(writesPerMillisecond));
|
||||
System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks, verbose));
|
||||
System.out.println("**************************************************");
|
||||
context.out.println("Time = " + result[ntry] + " milliseconds");
|
||||
context.out.println("Writes / millisecond = " + dcformat.format(writesPerMillisecond));
|
||||
context.out.println("bufferTimeout = " + toNanos(result[ntry], blocks, verbose, context));
|
||||
context.out.println("**************************************************");
|
||||
}
|
||||
file.close();
|
||||
|
||||
|
@ -206,12 +208,12 @@ public class SyncCalculation {
|
|||
}
|
||||
}
|
||||
|
||||
public static long toNanos(long time, long blocks, boolean verbose) {
|
||||
public static long toNanos(long time, long blocks, boolean verbose, ActionContext context) {
|
||||
|
||||
double blocksPerMillisecond = (double) blocks / (double) (time);
|
||||
|
||||
if (verbose) {
|
||||
System.out.println("Blocks per millisecond::" + blocksPerMillisecond);
|
||||
context.out.println("Blocks per millisecond::" + blocksPerMillisecond);
|
||||
}
|
||||
|
||||
long nanoSeconds = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
|
||||
|
@ -219,7 +221,7 @@ public class SyncCalculation {
|
|||
long timeWait = (long) (nanoSeconds / blocksPerMillisecond);
|
||||
|
||||
if (verbose) {
|
||||
System.out.println("your system could make a sync every " + timeWait + " nanoseconds, and this will be your timeout");
|
||||
context.out.println("your system could make a sync every " + timeWait + " nanoseconds, and this will be your timeout");
|
||||
}
|
||||
|
||||
return timeWait;
|
||||
|
|
|
@ -169,9 +169,9 @@ public class ArtemisTest extends CliTestBase {
|
|||
public void testSync() throws Exception {
|
||||
int writes = 2;
|
||||
int tries = 5;
|
||||
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true, "file.tmp", 1, JournalType.NIO);
|
||||
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true, "file.tmp", 1, JournalType.NIO, new TestActionContext());
|
||||
logger.debug("TotalAvg = {}", totalAvg);
|
||||
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false);
|
||||
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false, null);
|
||||
logger.debug("nanoTime avg = {}", nanoTime);
|
||||
assertEquals(0, LibaioContext.getTotalMaxIO());
|
||||
|
||||
|
|
Loading…
Reference in New Issue