diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs index dcddfa85b7518..684682e68fa71 100644 --- a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs @@ -17,7 +17,6 @@ The delivery mechanism is similar to Event Triggers; see "Hasura.Eventing.EventT module Hasura.Eventing.ScheduledTrigger ( processScheduledQueue , runScheduledEventsGenerator - , ScheduledEventSeed(..) , generateScheduleTimes , insertScheduledEvents @@ -29,6 +28,7 @@ import Data.Has import Data.Int (Int64) import Data.List (unfoldr) import Data.Time.Clock +import Data.Time.LocalTime (TimeZone(..),minutesToTimeZone) import Hasura.Eventing.HTTP import Hasura.Prelude import Hasura.RQL.DDL.Headers @@ -206,14 +206,28 @@ generateScheduledEventsFrom startTime ScheduledTriggerInfo{..} = let events = case stiSchedule of AdHoc _ -> empty -- ad-hoc scheduled events are created through 'create_scheduled_event' API - Cron cron -> generateScheduleTimes startTime 100 cron -- by default, generate next 100 events - in map (ScheduledEventSeed stiName) events + + Cron cron tz -> generateScheduleTimes startTime tz 100 cron + + in map (ScheduledEventSeed stiName) events + +addOffsetToUTCTime :: UTCTime -> TimeZone -> UTCTime +addOffsetToUTCTime ut (TimeZone mins _ _) = + addUTCTime (realToFrac $ (mins * 60)) ut -- | Generates next @n events starting @from according to 'CronSchedule' -generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime] -generateScheduleTimes from n cron = take n $ go from +-- When Timezone is not Nothing, the offset will be added to the `from` value +-- then the cron schedules are generated and then the offset will be subtracted +-- from the generated timestamps. +generateScheduleTimes :: UTCTime -> Maybe TimeZone -> Int -> CronSchedule -> [UTCTime] +generateScheduleTimes from timezone n cron = + case timezone of + Nothing -> take n $ go from + Just tz@(TimeZone mins _ _) -> + map (\t -> addOffsetToUTCTime t (inverseTimeZone mins)) $ take n $ go $ addOffsetToUTCTime from tz where go = unfoldr (fmap dup . nextMatch cron) + inverseTimeZone mins = minutesToTimeZone (-1 * mins) processScheduledQueue :: HasVersion diff --git a/server/src-lib/Hasura/Incremental/Internal/Dependency.hs b/server/src-lib/Hasura/Incremental/Internal/Dependency.hs index e69503ba49bcf..d19121115f296 100644 --- a/server/src-lib/Hasura/Incremental/Internal/Dependency.hs +++ b/server/src-lib/Hasura/Incremental/Internal/Dependency.hs @@ -22,6 +22,7 @@ import Data.Vector (Vector) import GHC.Generics ((:*:) (..), (:+:) (..), Generic (..), K1 (..), M1 (..), U1 (..), V1) import System.Cron.Types +import Data.Time.LocalTime (TimeZone(..)) import Hasura.Incremental.Select @@ -166,6 +167,8 @@ instance Cacheable N.URIAuth where unchanged _ = (==) instance Cacheable DiffTime where unchanged _ = (==) instance Cacheable NominalDiffTime where unchanged _ = (==) instance Cacheable UTCTime where unchanged _ = (==) +instance Cacheable TimeZone where unchanged _ = (==) + -- instances for CronSchedule from package `cron` instance Cacheable StepField diff --git a/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs b/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs index dc7dae82d6fcb..7349b40a6fd2f 100644 --- a/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs @@ -55,9 +55,9 @@ addScheduledTriggerToCatalog CreateScheduledTrigger {..} = liftTx $ do (name, scheduled_time) VALUES ($1, $2) |] (stName, timestamp) False - Cron cron -> do + Cron cron tz -> do currentTime <- liftIO C.getCurrentTime - let scheduleTimes = generateScheduleTimes currentTime 100 cron -- generate next 100 events + let scheduleTimes = generateScheduleTimes currentTime tz 100 cron -- generate next 100 events events = map (ScheduledEventSeed stName) scheduleTimes insertScheduledEvents events _ -> pure () diff --git a/server/src-lib/Hasura/RQL/Types/Catalog.hs b/server/src-lib/Hasura/RQL/Types/Catalog.hs index 14309a9cd7476..8256b8917c4c2 100644 --- a/server/src-lib/Hasura/RQL/Types/Catalog.hs +++ b/server/src-lib/Hasura/RQL/Types/Catalog.hs @@ -22,6 +22,7 @@ import qualified Data.HashMap.Strict as M import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH +import Data.Time.LocalTime (TimeZone(..)) import Hasura.Incremental (Cacheable) import Hasura.RQL.DDL.ComputedField @@ -38,6 +39,8 @@ import Hasura.RQL.Types.SchemaCache import Hasura.RQL.Types.ScheduledTrigger import Hasura.SQL.Types +import qualified Database.PG.Query as Q + newtype CatalogForeignKey = CatalogForeignKey { unCatalogForeignKey :: ForeignKey diff --git a/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs b/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs index 53f229ac0b876..9f433dc9ea061 100644 --- a/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs @@ -16,13 +16,16 @@ import Data.Time.Format import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH +import Data.Char import Hasura.Prelude import System.Cron.Types import Hasura.Incremental +import Data.Time.LocalTime (TimeZone(..),minutesToTimeZone) import qualified Data.Text as T import qualified Data.Aeson as J import qualified Hasura.RQL.Types.EventTrigger as ET +import qualified Database.PG.Query as Q data RetryConfST = RetryConfST @@ -46,7 +49,7 @@ defaultRetryConfST = , rcstTolerance = 21600 -- 6 hours } -data ScheduleType = Cron CronSchedule | AdHoc (Maybe UTCTime) +data ScheduleType = Cron CronSchedule (Maybe TimeZone) | AdHoc (Maybe UTCTime) deriving (Show, Eq, Generic) instance NFData ScheduleType @@ -57,15 +60,47 @@ instance FromJSON ScheduleType where withObject "ScheduleType" $ \o -> do type' <- o .: "type" case type' of - String "cron" -> Cron <$> o .: "value" + String "cron" -> Cron <$> o .: "value" <*> o .:? "utc-offset" String "adhoc" -> AdHoc <$> o .:? "value" _ -> fail "expected type to be cron or adhoc" instance ToJSON ScheduleType where - toJSON (Cron cs) = object ["type" .= String "cron", "value" .= toJSON cs] + toJSON (Cron cs (Just offset)) = object ["type" .= String "cron", "value" .= toJSON cs, "utc-offset" .= (show offset)] + toJSON (Cron cs Nothing) = object ["type" .= String "cron", "value" .= toJSON cs] toJSON (AdHoc (Just ts)) = object ["type" .= String "adhoc", "value" .= toJSON ts] toJSON (AdHoc Nothing) = object ["type" .= String "adhoc"] +-- convertUTCOffsetToTimeZone can take an offset in any one of +-- the following formats: +-- HHMM,HH:MM,(+/-)HHMM +-- If the length of the offset is 4, then it's assumed that it's a +-- positive offset. +convertUTCOffsetToTimeZone :: String -> Either String TimeZone +convertUTCOffsetToTimeZone offset + | length offset == 4 = convertUTCOffsetToTimeZone ('+':offset) +convertUTCOffsetToTimeZone (h1:h2:':':m1:m2:"") = + convertUTCOffsetToTimeZone('+':h1:h2:m1:m2:"") +convertUTCOffsetToTimeZone (p:h1:h2:':':m1:m2:"") = + convertUTCOffsetToTimeZone(p:h1:h2:m1:m2:"") +convertUTCOffsetToTimeZone ('+':h1:h2:m1:m2:"") + | and [(isDigit h1),(isDigit h2),(isDigit m1),(isDigit m2)] = + let mins = (10 * (digitToInt h1) + (digitToInt h2)) * 60 + + (10 * (digitToInt m1) + (digitToInt m2)) + in Right $ TimeZone mins False ('+':h1:h2:m1:m2:"") + | otherwise = Left "Invalid TimeZone Format" +convertUTCOffsetToTimeZone ('-':h1:h2:m1:m2:"") = + case convertUTCOffsetToTimeZone ('+':h1:h2:m1:m2:"") of + Left msg -> Left msg + Right (TimeZone mins isSummerOnly ('+':offset)) -> Right (TimeZone (-1 * mins) isSummerOnly ('-':offset)) +convertUTCOffsetToTimeZone _ = Left "Invalid TimeZone Format" + +instance FromJSON TimeZone where + parseJSON = withText "TimeZone" $ \o -> + either fail pure $ convertUTCOffsetToTimeZone $ T.unpack o + +instance ToJSON TimeZone where + toJSON (TimeZone _ _ offset) = String . T.pack $ offset + data CreateScheduledTrigger = CreateScheduledTrigger { stName :: !ET.TriggerName @@ -88,6 +123,7 @@ instance FromJSON CreateScheduledTrigger where stSchedule <- o .: "schedule" stRetryConf <- o .:? "retry_conf" .!= defaultRetryConfST stHeaders <- o .:? "headers" .!= [] + pure CreateScheduledTrigger {..} $(deriveToJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''CreateScheduledTrigger) diff --git a/server/tests-py/test_scheduled_triggers.py b/server/tests-py/test_scheduled_triggers.py index 72efe11ec8005..6078d1c21decd 100644 --- a/server/tests-py/test_scheduled_triggers.py +++ b/server/tests-py/test_scheduled_triggers.py @@ -5,6 +5,7 @@ from croniter import croniter from validate import validate_event_webhook,validate_event_headers from queue import Empty +from pytz import timezone import time def stringify_datetime(dt): @@ -25,9 +26,73 @@ def get_events_of_scheduled_trigger(hge_ctx,trigger_name): class TestScheduledTriggerCron(object): cron_trigger_name = "a_scheduled_trigger" + cron_trigger_with_offset = cron_trigger_name + "_offset" webhook_payload = {"foo":"baz"} webhook_path = "/hello" url = '/v1/query' + timezone_region = "Asia/Kolkata" + offset_at_timezone_region = "+05:30" + + def test_create_cron_schedule_triggers_with_offset(self,hge_ctx): + # setting the test to be after 30 mins, to make sure that + # any of the events are not triggered. + local_now = datetime.now().astimezone(timezone(self.timezone_region)) + min_after_30_mins = (local_now + timedelta(minutes=30)).minute + TestScheduledTriggerCron.cron_schedule = "{} * * * *".format(min_after_30_mins) + + cron_st_api_query = { + "type":"create_scheduled_trigger", + "args":{ + "name":self.cron_trigger_with_offset, + "webhook":"http://127.0.0.1:5594" + "/foo", + "schedule":{ + "type":"cron", + "value":self.cron_schedule, + "utc-offset":self.offset_at_timezone_region + }, + "headers":[ + { + "name":"foo", + "value":"baz" + } + ], + "payload":self.webhook_payload + } + } + headers = {} + if hge_ctx.hge_key is not None: + headers['X-Hasura-Admin-Secret'] = hge_ctx.hge_key + cron_st_code,cron_st_resp,_ = hge_ctx.anyq(self.url,cron_st_api_query,headers) + TestScheduledTriggerCron.init_time_with_offset = datetime.now().astimezone(timezone(self.timezone_region)) # the cron events will be generated based on the current time, they will not be exactly the same though(the server now and now here) + assert cron_st_code == 200 + assert cron_st_resp['message'] == 'success' + + def test_check_generated_cron_scheduled_events_with_offset(self,hge_ctx): + expected_schedule_timestamps = [] + iter = croniter(self.cron_schedule,self.init_time_with_offset) + for i in range(100): + dt = iter.next(datetime) + expected_schedule_timestamps.append(datetime.timestamp(dt)) + sql = ''' + select timezone('{}',scheduled_time) as scheduled_time + from hdb_catalog.hdb_scheduled_events where + name = '{}' order by scheduled_time asc; + ''' + q = { + "type":"run_sql", + "args":{ + "sql":sql.format(self.timezone_region, self.cron_trigger_with_offset) + } + } + st,resp = hge_ctx.v1q(q) + assert st == 200 + ts_resp = resp['result'][1:] + assert len(ts_resp) == 100 # 100 events are generated in a cron ST + db_timestamps = [] + for ts in ts_resp: + datetime_ts = datetime.strptime(ts[0],"%Y-%m-%d %H:%M:%S") + db_timestamps.append(datetime.timestamp(datetime_ts)) + assert db_timestamps == expected_schedule_timestamps def test_create_cron_schedule_triggers(self,hge_ctx): # setting the test to be after 30 mins, to make sure that @@ -90,14 +155,15 @@ def test_check_generated_cron_scheduled_events(self,hge_ctx): assert future_schedule_timestamps == scheduled_events_ts def test_delete_cron_scheduled_trigger(self,hge_ctx): - q = { - "type":"delete_scheduled_trigger", - "args":{ - "name":self.cron_trigger_name + for trigger_name in [self.cron_trigger_name,self.cron_trigger_with_offset]: + q = { + "type":"delete_scheduled_trigger", + "args":{ + "name":trigger_name + } } - } - st,resp = hge_ctx.v1q(q) - assert st == 200,resp + st,resp = hge_ctx.v1q(q) + assert st == 200,resp class ScheduledEventNotFound(Exception): pass