File tree Expand file tree Collapse file tree 3 files changed +50
-8
lines changed
Expand file tree Collapse file tree 3 files changed +50
-8
lines changed Original file line number Diff line number Diff line change 3232 (rx state signal-name)))
3333
3434(defn <!
35- " Light-weight/parking receive of a single message"
35+ " Light-weight/parking receive of a single message with an optional timeout "
3636 ([state] (<! state ::default ))
37- ([state signal-name]
38- (log/trace " waiting on:" signal-name)
39- (w/await #(not (is-empty? state signal-name)))
40- (rx state signal-name)))
37+ ([state signal-name] (<! state signal-name nil ))
38+ ([state signal-name timeout]
39+ (log/trace " waiting on:" signal-name " with timeout" timeout)
40+ (let [pred #(not (is-empty? state signal-name))]
41+ (if (some? timeout)
42+ (do
43+ (when (w/await timeout pred)
44+ (rx state signal-name)))
45+ (do
46+ (w/await pred)
47+ (rx state signal-name))))))
4148
4249(defn >!
4350 " Sends `payload` to `workflow-id` via signal `signal-name`."
Original file line number Diff line number Diff line change 66 [temporal.internal.utils :as u]
77 [temporal.internal.workflow :as w])
88 (:import [io.temporal.workflow Workflow]
9- [java.util.function Supplier]))
9+ [java.util.function Supplier]
10+ [java.time Duration]))
1011
1112(defn get-info
1213 " Return info about the current workflow"
1314 []
1415 (w/get-info ))
1516
1617(defn- ->supplier
17- [f]
18+ ^Supplier [f]
1819 (reify Supplier
1920 (get [_]
2021 (f ))))
2324 " Efficiently parks the workflow until 'pred' evaluates to true. Re-evaluates on each state transition"
2425 ([pred]
2526 (Workflow/await (->supplier pred)))
26- ([duration pred]
27+ ([^Duration duration pred]
2728 (Workflow/await duration (->supplier pred))))
2829
2930(defmacro defworkflow
Original file line number Diff line number Diff line change 1+ ; ; Copyright © 2022 Manetu, Inc. All rights reserved
2+
3+ (ns temporal.test.signal-timeout
4+ (:require [clojure.test :refer :all ]
5+ [taoensso.timbre :as log]
6+ [temporal.client.core :refer [>!] :as c]
7+ [temporal.signals :refer [<!]]
8+ [temporal.workflow :refer [defworkflow ]]
9+ [temporal.test.utils :as t])
10+ (:import [java.time Duration]))
11+
12+ (use-fixtures :once t/wrap-service)
13+
14+ (def signal-name ::signal )
15+
16+ (defworkflow timeout-workflow
17+ [ctx {:keys [signals] :as args}]
18+ (log/info " timeout-workflow:" args)
19+ (or (<! signals signal-name (Duration/ofSeconds 1 ))
20+ :timed-out ))
21+
22+ (defn create []
23+ (let [wf (c/create-workflow (t/get-client ) timeout-workflow {:task-queue t/task-queue})]
24+ (c/start wf nil )
25+ wf))
26+
27+ (deftest the-test
28+ (testing " Verifies that signals may timeout properly"
29+ (let [wf (create )]
30+ (is (= @(c/get-result wf) :timed-out ))))
31+ (testing " Verifies that signals are received properly even when a timeout is requested"
32+ (let [wf (create )]
33+ (>! wf ::signal " Hi, Bob" )
34+ (is (= @(c/get-result wf) " Hi, Bob" )))))
You can’t perform that action at this time.
0 commit comments