From bfae07338c8e4e51da3b2cfb247af9b8a19e4c64 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Tue, 14 Jan 2014 11:11:04 +0100 Subject: [PATCH] make ServerTimeRejectionPolicy also reject timestamps AHEAD of server time --- .../realtime/plumber/ServerTimeRejectionPolicyFactory.java | 7 ++++++- .../plumber/ServerTimeRejectionPolicyFactoryTest.java | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/ServerTimeRejectionPolicyFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/ServerTimeRejectionPolicyFactory.java index 98eb2d20741..198717b241a 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/ServerTimeRejectionPolicyFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/ServerTimeRejectionPolicyFactory.java @@ -40,7 +40,12 @@ public class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory @Override public boolean accept(long timestamp) { - return timestamp >= (System.currentTimeMillis() - windowMillis); + long now = System.currentTimeMillis(); + + boolean notTooOld = timestamp >= (now - windowMillis); + boolean notTooYoung = timestamp <= (now + windowMillis); + + return notTooOld && notTooYoung; } @Override diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java index 061e3325159..14212705f82 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/ServerTimeRejectionPolicyFactoryTest.java @@ -37,8 +37,10 @@ public class ServerTimeRejectionPolicyFactoryTest DateTime now = new DateTime(); DateTime past = now.minus(period).minus(1); + DateTime future = now.plus(period).plus(1); Assert.assertTrue(rejectionPolicy.accept(now.getMillis())); Assert.assertFalse(rejectionPolicy.accept(past.getMillis())); + Assert.assertFalse(rejectionPolicy.accept(future.getMillis())); } }