Fixes according to comments.
This commit is contained in:
parent
a0a393cdfc
commit
6fa2361283
@ -22,6 +22,7 @@ public class FileWritingBolt extends BaseRichBolt {
|
|||||||
private BufferedWriter writer;
|
private BufferedWriter writer;
|
||||||
private String filePath;
|
private String filePath;
|
||||||
private ObjectMapper objectMapper;
|
private ObjectMapper objectMapper;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
|
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
|
||||||
objectMapper = new ObjectMapper();
|
objectMapper = new ObjectMapper();
|
||||||
@ -55,6 +56,15 @@ public class FileWritingBolt extends BaseRichBolt {
|
|||||||
this.filePath = filePath;
|
this.filePath = filePath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanup() {
|
||||||
|
try {
|
||||||
|
writer.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("Failed to close the writer!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ public class RandomNumberSpout extends BaseRichSpout {
|
|||||||
public void nextTuple() {
|
public void nextTuple() {
|
||||||
Utils.sleep(1000);
|
Utils.sleep(1000);
|
||||||
//This will select random int from the range (-1000, 1000)
|
//This will select random int from the range (-1000, 1000)
|
||||||
int operation = random.nextInt(1000 + 1 + 1000) - 1000;
|
int operation = random.nextInt(101);
|
||||||
long timestamp = System.currentTimeMillis();
|
long timestamp = System.currentTimeMillis();
|
||||||
|
|
||||||
Values values = new Values(operation, timestamp);
|
Values values = new Values(operation, timestamp);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user