mirror of https://github.com/apache/druid.git
Adding a test producer application.
This commit is contained in:
parent
da658ac69a
commit
ae4132adba
|
@ -0,0 +1,182 @@
|
|||
package druid.examples.rabbitmq;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import org.apache.commons.cli.*;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class RabbitMQProducerMain
|
||||
{
|
||||
public static void main(String[] args)
|
||||
throws Exception
|
||||
{
|
||||
// We use a List to keep track of option insertion order. See below.
|
||||
final List<Option> optionList = new ArrayList<Option>();
|
||||
|
||||
optionList.add(OptionBuilder.withLongOpt("help")
|
||||
.withDescription("display this help message")
|
||||
.create("h"));
|
||||
optionList.add(OptionBuilder.withLongOpt("hostname")
|
||||
.hasArg()
|
||||
.withDescription("the hostname of the AMQP broker [defaults to AMQP library default]")
|
||||
.create("b"));
|
||||
optionList.add(OptionBuilder.withLongOpt("port")
|
||||
.hasArg()
|
||||
.withDescription("the port of the AMQP broker [defaults to AMQP library default]")
|
||||
.create("n"));
|
||||
optionList.add(OptionBuilder.withLongOpt("username")
|
||||
.hasArg()
|
||||
.withDescription("username to connect to the AMQP broker [defaults to AMQP library default]")
|
||||
.create("u"));
|
||||
optionList.add(OptionBuilder.withLongOpt("password")
|
||||
.hasArg()
|
||||
.withDescription("password to connect to the AMQP broker [defaults to AMQP library default]")
|
||||
.create("p"));
|
||||
optionList.add(OptionBuilder.withLongOpt("vhost")
|
||||
.hasArg()
|
||||
.withDescription("name of virtual host on the AMQP broker [defaults to AMQP library default]")
|
||||
.create("v"));
|
||||
optionList.add(OptionBuilder.withLongOpt("exchange")
|
||||
.isRequired()
|
||||
.hasArg()
|
||||
.withDescription("name of the AMQP exchange [required - no default]")
|
||||
.create("e"));
|
||||
optionList.add(OptionBuilder.withLongOpt("key")
|
||||
.hasArg()
|
||||
.withDescription("the routing key to use when sending messages [default: 'default.routing.key']")
|
||||
.create("k"));
|
||||
optionList.add(OptionBuilder.withLongOpt("type")
|
||||
.hasArg()
|
||||
.withDescription("the type of exchange to create [default: 'topic']")
|
||||
.create("t"));
|
||||
optionList.add(OptionBuilder.withLongOpt("durable")
|
||||
.withDescription("if set, a durable exchange will be declared [default: not set]")
|
||||
.create("d"));
|
||||
optionList.add(OptionBuilder.withLongOpt("autodelete")
|
||||
.withDescription("if set, an auto-delete exchange will be declared [default: not set]")
|
||||
.create("a"));
|
||||
optionList.add(OptionBuilder.withLongOpt("single")
|
||||
.withDescription("if set, only a single message will be sent [default: not set]")
|
||||
.create("s"));
|
||||
optionList.add(OptionBuilder.withLongOpt("start")
|
||||
.hasArg()
|
||||
.withDescription("time to use to start sending messages from [default: 2010-01-01T00:00:00]")
|
||||
.create());
|
||||
optionList.add(OptionBuilder.withLongOpt("stop")
|
||||
.hasArg()
|
||||
.withDescription("time to use to send messages until (format: '2013-07-18T23:45:59') [default: current time]")
|
||||
.create());
|
||||
optionList.add(OptionBuilder.withLongOpt("interval")
|
||||
.hasArg()
|
||||
.withDescription("the interval to add to the timestamp between messages in seconds [default: 10]")
|
||||
.create());
|
||||
optionList.add(OptionBuilder.withLongOpt("delay")
|
||||
.hasArg()
|
||||
.withDescription("the delay between sending messages in milliseconds [default: 100]")
|
||||
.create());
|
||||
|
||||
// An extremely silly hack to maintain the above order in the help formatting.
|
||||
HelpFormatter formatter = new HelpFormatter();
|
||||
// Add a comparator to the HelpFormatter using the ArrayList above to sort by insertion order.
|
||||
formatter.setOptionComparator(new Comparator(){
|
||||
@Override
|
||||
public int compare(Object o1, Object o2)
|
||||
{
|
||||
// I know this isn't fast, but who cares! The list is short.
|
||||
return optionList.indexOf(o1) - optionList.indexOf(o2);
|
||||
}
|
||||
});
|
||||
|
||||
// Now we can add all the options to an Options instance. This is dumb!
|
||||
Options options = new Options();
|
||||
for (Option option : optionList) {
|
||||
options.addOption(option);
|
||||
}
|
||||
|
||||
CommandLine cmd = null;
|
||||
|
||||
try{
|
||||
cmd = new BasicParser().parse(options, args);
|
||||
|
||||
}
|
||||
catch(ParseException e){
|
||||
formatter.printHelp("RabbitMQProducerMain", e.getMessage(), options, null);
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
if(cmd.hasOption("h")) {
|
||||
formatter.printHelp("RabbitMQProducerMain", options);
|
||||
System.exit(2);
|
||||
}
|
||||
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
|
||||
if(cmd.hasOption("b")){
|
||||
factory.setHost(cmd.getOptionValue("b"));
|
||||
}
|
||||
if(cmd.hasOption("u")){
|
||||
factory.setUsername(cmd.getOptionValue("u"));
|
||||
}
|
||||
if(cmd.hasOption("p")){
|
||||
factory.setPassword(cmd.getOptionValue("p"));
|
||||
}
|
||||
if(cmd.hasOption("v")){
|
||||
factory.setVirtualHost(cmd.getOptionValue("v"));
|
||||
}
|
||||
if(cmd.hasOption("n")){
|
||||
factory.setPort(Integer.parseInt(cmd.getOptionValue("n")));
|
||||
}
|
||||
|
||||
String exchange = cmd.getOptionValue("e");
|
||||
String routingKey = "default.routing.key";
|
||||
if(cmd.hasOption("k")){
|
||||
routingKey = cmd.getOptionValue("k");
|
||||
}
|
||||
|
||||
boolean durable = cmd.hasOption("d");
|
||||
boolean autoDelete = cmd.hasOption("a");
|
||||
String type = cmd.getOptionValue("t", "topic");
|
||||
boolean single = cmd.hasOption("single");
|
||||
int interval = Integer.parseInt(cmd.getOptionValue("interval", "10"));
|
||||
int delay = Integer.parseInt(cmd.getOptionValue("delay", "100"));
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
|
||||
Date stop = sdf.parse(cmd.getOptionValue("stop", sdf.format(new Date())));
|
||||
|
||||
Random r = new Random();
|
||||
Calendar timer = Calendar.getInstance();
|
||||
timer.setTime(sdf.parse(cmd.getOptionValue("start", "2010-01-01T00:00:00")));
|
||||
|
||||
String msg_template = "{\"utcdt\": \"%s\", \"wp\": %d, \"gender\": \"%s\", \"age\": %d}";
|
||||
|
||||
Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.exchangeDeclare(exchange, type, durable, autoDelete, null);
|
||||
|
||||
do{
|
||||
int wp = (10 + r.nextInt(90)) * 100;
|
||||
String gender = r.nextBoolean() ? "male" : "female";
|
||||
int age = 20 + r.nextInt(70);
|
||||
|
||||
String line = String.format(msg_template, sdf.format(timer.getTime()), wp, gender, age);
|
||||
|
||||
channel.basicPublish(exchange, routingKey, null, line.getBytes());
|
||||
|
||||
System.out.println("Sent message: " + line);
|
||||
|
||||
timer.add(Calendar.SECOND, interval);
|
||||
|
||||
Thread.sleep(delay);
|
||||
}while((!single && stop.after(timer.getTime())));
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue