Skip to content

Commit d73035e

Browse files
committed
Add register-signal-handler! and signal-channel abstraction
This patch breaks out the signal handling such that the core.async abstraction surrounding signals is now optional. There is a new function: temporal.signals/register-signal-handler! that offers low-level access to the signal callback. The design was inspired by the work of Thomas Moerman (https://github.com/tmoerman) in the Query support (See 4f052ae) This work prompted another cleanup that I had been meaning to do: the removal of the 'ctx' from the defworkflow. The original design had both defworkflow and defactivity receiving a user supplied 'ctx', but this never made sense for workflows since the context invites non-determinism. So, the new signature of defworkflow is simply a single-arity function that receives the arguments directly, rather than within a {:keys [signals args]} map. We still support 2-arity clients (for now) with a backwards compatibility check within the macro. This may be removed in a future release, so we print a WARN to signal the developer of the deprecation. Signed-off-by: Greg Haskins <[email protected]>
1 parent 7de591d commit d73035e

30 files changed

+214
-87
lines changed

doc/clients.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ As a simple example, for the following Workflow implementation:
3333
(str "Hi, " name))
3434

3535
(defworkflow greeter-workflow
36-
[ctx {:keys [args]}]
36+
[args]
3737
(log/info "greeter-workflow:" args)
3838
@(a/invoke greet-activity args))
3939
```

doc/testing.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ You can use the provided environment with a Clojure unit testing framework of yo
2525
(str "Hi, " name))
2626

2727
(defworkflow greeter-workflow
28-
[ctx {:keys [args]}]
28+
[args]
2929
(log/info "greeter-workflow:" args)
3030
@(a/invoke greet-activity args))
3131

doc/workers.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ As a simple example, let's say we want our Worker to be able to execute the foll
2525
(str "Hi, " name))
2626

2727
(defworkflow greeter-workflow
28-
[ctx {:keys [args]}]
28+
[args]
2929
(log/info "greeter-workflow:" args)
3030
@(a/invoke greet-activity args))
3131
```

doc/workflows.md

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ In this Clojure SDK programming model, a Temporal Workflow is a function declare
1010

1111
```clojure
1212
(defworkflow my-workflow
13-
[ctx params]
13+
[params]
1414
...)
1515
```
1616

@@ -24,7 +24,7 @@ A Workflow implementation consists of defining a (defworkflow) function. The pl
2424
(require '[temporal.workflow :refer [defworkflow]])
2525

2626
(defworkflow my-workflow
27-
[ctx {{:keys [foo]} :args}]
27+
[{:keys [foo]}]
2828
...)
2929
```
3030

@@ -65,7 +65,7 @@ In this Clojure SDK, developers manage Workflows with the following flow:
6565

6666
```clojure
6767
(defworkflow my-workflow
68-
[ctx {{:keys [foo]} :args}]
68+
[{:keys [foo]}]
6969
...)
7070

7171
(let [w (create-workflow client my-workflow {:task-queue "MyTaskQueue"})]
@@ -155,24 +155,45 @@ Your Workflow may send or receive [signals](https://cljdoc.org/d/io.github.manet
155155

156156
#### Receiving Signals
157157

158-
Your Workflow may either block waiting with signals with [temporal.signals/<!](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.signals#%3C!) or use the non-blocking [temporal.signals/poll](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.signals#poll). Either way, your Workflow needs to obtain the `signals` context provided in the Workflow request map.
158+
##### Channel Abstraction using 'signal-chan'
159159

160-
##### Example
160+
This SDK provides a [core.async](https://github.com/clojure/core.async) inspired abstraction on [Temporal Signals](https://docs.temporal.io/workflows#signal) called signal-channels. To use signal channels, your Workflow may either block waiting with signals with [temporal.signals/<!](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.signals#%3C!) or use the non-blocking [temporal.signals/poll](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.signals#poll). Either way, your Workflow needs to first obtain the `signal-chan` context obtained by [temporal.signals/create-signal-chan](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.signals#create-signal-chan).
161+
162+
###### Example
161163

162164
```clojure
165+
(require `[temporal.signals :as s])
166+
163167
(defworkflow my-workflow
164-
[ctx {:keys [signals]}]
165-
(let [message (<! signals "MySignal")]
168+
[args]
169+
(let [signals (s/create-signal-chan)
170+
message (<! signals "MySignal")]
166171
...))
167172
```
168173

174+
##### Raw Signal Handling
175+
176+
Alternatively, you may opt to handle signals directly with [temporal.signals/register-signal-handler!](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.signals#register-signal-handler!)
177+
178+
###### Example
179+
180+
```clojure
181+
(defworkflow my-workflow
182+
[args]
183+
(let [state (atom 0)]
184+
(s/register-signal-handler! (fn [signal-name args]
185+
(swap! state inc)))
186+
(w/await (fn [] (> @state 1)))
187+
@state))
188+
```
189+
169190
### Temporal Queries
170191

171192
Your Workflow may respond to [queries](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#query).
172193

173194
A temporal query is similar to a temporal signal, both are messages sent to a running Workflow.
174195
The difference is that a signal intends to change the behaviour of the Workflow, whereas a query intends to inspect the current state of the Workflow.
175-
Querying the state of a Workflow implies that the Workflow must maintain state while running, typically in a clojure [atom](https://clojuredocs.org/clojure.core/atom).
196+
Querying the state of a Workflow implies that the Workflow must maintain state while running, typically in a clojure [atom](https://clojuredocs.org/clojure.core/atom) or [ref](https://clojure.org/reference/refs).
176197

177198
#### Registering a Query handler
178199

@@ -181,7 +202,7 @@ The query handler is a function that has a reference to the Workflow state, usua
181202

182203
```clojure
183204
(defworkflow stateful-workflow
184-
[ctx {:keys [signals] {:keys [init] :as args} :args :as params}]
205+
[{:keys [init] :as args}]
185206
(let [state (atom init)]
186207
(register-query-handler! (fn [query-type args]
187208
(when (= query-type :my-query)

src/temporal/internal/signals.clj

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
(.add ch payload)
2222
(assoc s signal-name ch)))))
2323

24-
(defn create
25-
"Registers the calling workflow to receive signals and returns a context variable to be passed back later"
24+
(defn register-signal-handler!
25+
[f]
26+
(Workflow/registerListener
27+
(reify
28+
DynamicSignalHandler
29+
(handle [_ signal-name args]
30+
(f signal-name (u/->args args))))))
31+
32+
(defn create-signal-chan
2633
[]
2734
(let [state (atom {})]
28-
(Workflow/registerListener
29-
(reify
30-
DynamicSignalHandler
31-
(handle [_ signal-name args]
32-
(-handle state signal-name (u/->args args)))))
35+
(register-signal-handler! (partial -handle state))
3336
state))

src/temporal/internal/utils.clj

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,16 @@
7575
(verify-registered-fns data)
7676
(m/index-by :name data)))
7777

78+
(defn find-dispatch
79+
"Finds any dispatch descriptor named 't' that carry metadata 'marker'"
80+
[dispatch-table t]
81+
(or (get dispatch-table t)
82+
(throw (ex-info "workflow/activity not found" {:function t}))))
83+
7884
(defn find-dispatch-fn
7985
"Finds any functions named 't' that carry metadata 'marker'"
8086
[dispatch-table t]
81-
(:fn (or (get dispatch-table t)
82-
(throw (ex-info "workflow/activity not found" {:function t})))))
87+
(:fn (find-dispatch dispatch-table t)))
8388

8489
(defn import-dispatch
8590
[marker coll]

src/temporal/internal/workflow.clj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,13 @@
5353
[ctx dispatch args]
5454
(try
5555
(let [{:keys [workflow-type workflow-id]} (get-info)
56-
f (u/find-dispatch-fn dispatch workflow-type)
56+
d (u/find-dispatch dispatch workflow-type)
57+
f (:fn d)
5758
a (u/->args args)
5859
_ (log/trace workflow-id "calling" f "with args:" a)
59-
r (f ctx {:args a :signals (s/create)})]
60+
r (if (-> d :type (= :legacy))
61+
(f ctx {:args a :signals (s/create-signal-chan)})
62+
(f a))]
6063
(log/trace workflow-id "result:" r)
6164
(nippy/freeze r))
6265
(catch Exception e

src/temporal/signals.clj

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,46 +9,77 @@
99
(:import [io.temporal.workflow Workflow]))
1010

1111
(defn is-empty?
12-
"Returns 'true' if 'signal-name' either doesn't exist or exists but has no pending messages"
13-
[state signal-name]
12+
"Returns 'true' if 'signal-name' either doesn't exist within signal-chan or exists but has no pending messages"
13+
[signal-chan signal-name]
1414
(let [signal-name (u/namify signal-name)
15-
ch (s/get-ch @state signal-name)
15+
ch (s/get-ch @signal-chan signal-name)
1616
r (or (nil? ch) (.isEmpty ch))]
17-
(log/trace "is-empty?:" @state signal-name r)
17+
(log/trace "is-empty?:" @signal-chan signal-name r)
1818
r))
1919

2020
(defn- rx
21-
[state signal-name]
21+
[signal-chan signal-name]
2222
(let [signal-name (u/namify signal-name)
23-
ch (s/get-ch @state signal-name)
23+
ch (s/get-ch @signal-chan signal-name)
2424
m (.poll ch)]
2525
(log/trace "rx:" signal-name m)
2626
m))
2727

2828
(defn poll
29-
"Non-blocking check of the signal. Consumes and returns a message if found, otherwise returns 'nil'"
30-
[state signal-name]
31-
(when-not (is-empty? state signal-name)
32-
(rx state signal-name)))
29+
"Non-blocking check of the signal via signal-chan. Consumes and returns a message if found, otherwise returns 'nil'"
30+
[signal-chan signal-name]
31+
(when-not (is-empty? signal-chan signal-name)
32+
(rx signal-chan signal-name)))
3333

3434
(defn <!
35-
"Light-weight/parking receive of a single message with an optional timeout"
36-
([state] (<! state ::default))
37-
([state signal-name] (<! state signal-name nil))
38-
([state signal-name timeout]
35+
"Light-weight/parking receive of a single message from signal-chan with an optional timeout"
36+
([signal-chan] (<! signal-chan ::default))
37+
([signal-chan signal-name] (<! signal-chan signal-name nil))
38+
([signal-chan signal-name timeout]
3939
(log/trace "waiting on:" signal-name "with timeout" timeout)
40-
(let [pred #(not (is-empty? state signal-name))]
40+
(let [pred #(not (is-empty? signal-chan signal-name))]
4141
(if (some? timeout)
4242
(do
4343
(when (w/await timeout pred)
44-
(rx state signal-name)))
44+
(rx signal-chan signal-name)))
4545
(do
4646
(w/await pred)
47-
(rx state signal-name))))))
47+
(rx signal-chan signal-name))))))
4848

4949
(defn >!
5050
"Sends `payload` to `workflow-id` via signal `signal-name`."
5151
[^String workflow-id signal-name payload]
5252
(let [signal-name (u/namify signal-name)
5353
stub (Workflow/newUntypedExternalWorkflowStub workflow-id)]
54-
(.signal stub signal-name (u/->objarray payload))))
54+
(.signal stub signal-name (u/->objarray payload))))
55+
56+
(def register-signal-handler!
57+
"
58+
Registers a DynamicSignalHandler listener that handles signals sent to the workflow such as with [[>!]].
59+
60+
Use inside a workflow definition with 'f' closing over any desired workflow state (e.g. atom) to mutate
61+
the workflow state.
62+
63+
Arguments:
64+
- `f`: a 2-arity function, expecting 2 arguments.
65+
66+
`f` arguments:
67+
- `signal-name`: string
68+
- `args`: params value or data structure
69+
70+
```clojure
71+
(defworkflow signalled-workflow
72+
[{:keys [init] :as args}]
73+
(let [state (atom init)]
74+
(register-signal-handler! (fn [signal-name args]
75+
(when (= signal-name \"mysignal\")
76+
(update state #(conj % args)))))
77+
;; workflow implementation
78+
))
79+
```"
80+
s/register-signal-handler!)
81+
82+
(def create-signal-chan
83+
"Registers the calling workflow to receive signals and returns a 'signal-channel' context for use with functions such as [[<!]] amd [[poll]]"
84+
s/create-signal-chan)
85+

src/temporal/workflow.clj

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ Arguments:
5050
5151
```clojure
5252
(defworkflow stateful-workflow
53-
[ctx {:keys [signals] {:keys [init] :as args} :args :as params}]
53+
[{:keys [init] :as args}]
5454
(let [state (atom init)]
5555
(register-query-handler! (fn [query-type args]
5656
(when (= query-type :my-query)
@@ -71,19 +71,16 @@ Arguments:
7171

7272
(defmacro defworkflow
7373
"
74-
Defines a new workflow, similar to defn, expecting a 2-arity parameter list and body. Should evaluate to something
74+
Defines a new workflow, similar to defn, expecting a 1-arity parameter list and body. Should evaluate to something
7575
serializable, which will become available for [[temporal.client.core/get-result]].
7676
7777
Arguments:
7878
79-
- `ctx`: Context passed through from [[temporal.client.worker/start]]
80-
- `params`: A map containing the following
81-
- `args`: Passed from 'params' to [[temporal.client.core/start]] or [[temporal.client.core/signal-with-start]]
82-
- `signals`: Signal context for use with signal calls such as [[temporal.signals/<!]] and [[temporal.signals/poll]]
79+
- `args`: Passed from 'params' to [[temporal.client.core/start]] or [[temporal.client.core/signal-with-start]]
8380
8481
```clojure
8582
(defworkflow my-workflow
86-
[ctx {{:keys [foo]} :args}]
83+
[{:keys [foo]}]
8784
...)
8885
8986
(let [w (create-workflow client my-workflow {:task-queue ::my-task-queue})]
@@ -93,8 +90,17 @@ Arguments:
9390
[name params* & body]
9491
(let [fqn (u/get-fq-classname name)
9592
sname (str name)]
96-
`(def ~name ^{::w/def {:name ~sname :fqn ~fqn}}
97-
(fn [ctx# args#]
98-
(log/trace (str ~fqn ": ") args#)
99-
(let [f# (fn ~params* (do ~@body))]
100-
(f# ctx# args#))))))
93+
(if (= (count params*) 2) ;; legacy
94+
(do
95+
(println (str *file* ":" (:line (meta &form)) " WARN: (defworkflow " name ") with [ctx {:keys [signals args]}] is deprecated. Use [args] form."))
96+
`(def ~name ^{::w/def {:name ~sname :fqn ~fqn :type :legacy}}
97+
(fn [ctx# args#]
98+
(log/trace (str ~fqn ": ") args#)
99+
(let [f# (fn ~params* (do ~@body))]
100+
(f# ctx# args#)))))
101+
(do
102+
`(def ~name ^{::w/def {:name ~sname :fqn ~fqn}}
103+
(fn [args#]
104+
(log/trace (str ~fqn ": ") args#)
105+
(let [f# (fn ~params* (do ~@body))]
106+
(f# args#))))))))

test/temporal/test/async.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
(str "Hi, " name))))
2121

2222
(defworkflow async-greeter-workflow
23-
[ctx {:keys [args]}]
23+
[args]
2424
(log/info "greeter-workflow:" args)
2525
@(a/invoke async-greet-activity args {:retry-options {:maximum-attempts 1}}))
2626

0 commit comments

Comments
 (0)