Skip to content

Commit 5f01f77

Browse files
committed
Expose temporal interceptors
Signed-off-by: Greg Haskins <[email protected]>
1 parent 12fbef7 commit 5f01f77

File tree

5 files changed

+127
-17
lines changed

5 files changed

+127
-17
lines changed

project.clj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
:codox {:metadata {:doc/format :markdown}}
2929

3030
:profiles {:dev {:dependencies [[org.clojure/tools.namespace "1.4.4"]
31-
[eftest "0.6.0"]]}}
31+
[eftest "0.6.0"]
32+
[io.temporal/temporal-opentracing "1.22.3"]]}}
3233
:cloverage {:runner :eftest
3334
:runner-opts {:multithread? false
3435
:fail-fast? true}

src/temporal/client/core.clj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
[temporal.internal.utils :as u])
1010
(:import [java.time Duration]
1111
[io.temporal.client WorkflowClient WorkflowClientOptions WorkflowClientOptions$Builder WorkflowStub]
12-
[io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder]))
12+
[io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder]
13+
[io.temporal.common.interceptors WorkflowClientInterceptorBase]))
1314

1415
(def ^:no-doc stub-options
1516
{:channel #(.setChannel ^WorkflowServiceStubsOptions$Builder %1 %2)
@@ -35,7 +36,8 @@
3536
(def ^:no-doc client-options
3637
{:identity #(.setIdentity ^WorkflowClientOptions$Builder %1 %2)
3738
:namespace #(.setNamespace ^WorkflowClientOptions$Builder %1 %2)
38-
:data-converter #(.setDataConverter ^WorkflowClientOptions$Builder %1 %2)})
39+
:data-converter #(.setDataConverter ^WorkflowClientOptions$Builder %1 %2)
40+
:interceptors #(.setInterceptors ^WorkflowClientOptions$Builder %1 (into-array WorkflowClientInterceptorBase %2))})
3941

4042
(defn ^:no-doc client-options->
4143
^WorkflowClientOptions [params]
@@ -60,6 +62,7 @@ Arguments:
6062
| :identity | Overrides the worker node identity (workers only) | String | |
6163
| :namespace | Sets the Temporal namespace context for this client | String | |
6264
| :data-converter | Overrides the data converter used to serialize arguments and results. | [DataConverter](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/converter/DataConverter.html) | |
65+
| :interceptors | Collection of interceptors used to intercept workflow client calls. | [WorkflowClientInterceptor](https://javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/interceptors/WorkflowClientInterceptor.html) | |
6366
| :channel | Sets gRPC channel to use. Exclusive with target and sslContext | [ManagedChannel](https://grpc.github.io/grpc-java/javadoc/io/grpc/ManagedChannel.html) | |
6467
| :ssl-context | Sets gRPC SSL Context to use | [SslContext](https://netty.io/4.0/api/io/netty/handler/ssl/SslContext.html) | |
6568
| :enable-https | Sets option to enable SSL/TLS/HTTPS for gRPC | boolean | false |

src/temporal/client/worker.clj

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
[temporal.internal.activity :as a]
77
[temporal.internal.workflow :as w]
88
[temporal.internal.utils :as u])
9-
(:import [io.temporal.worker Worker WorkerFactory WorkerOptions WorkerOptions$Builder]
9+
(:import [io.temporal.worker Worker WorkerFactory WorkerFactoryOptions WorkerFactoryOptions$Builder WorkerOptions WorkerOptions$Builder]
1010
[temporal.internal.dispatcher DynamicWorkflowProxy]
11-
[io.temporal.workflow DynamicWorkflow]))
11+
[io.temporal.workflow DynamicWorkflow]
12+
[io.temporal.common.interceptors WorkerInterceptor]))
1213

1314
(defn ^:no-doc init
1415
"
@@ -27,6 +28,27 @@ Initializes a worker instance, suitable for real connections or unit-testing wit
2728
(reify DynamicWorkflow
2829
(execute [_ args]
2930
(w/execute ctx (:workflows dispatch) args)))))))))
31+
(def worker-factory-options
32+
"
33+
Options for configuring the worker-factory (See [[start]])
34+
35+
| Value | Description | Type | Default |
36+
| ------------ | ----------------------------------------------------------------- | ---------------- | ------- |
37+
| :enable-logging-in-replay | | boolean | false |
38+
| :max-workflow-thread-count | Maximum number of threads available for workflow execution across all workers created by the Factory. | int | 600 |
39+
| :worker-interceptors | Collection of WorkerInterceptors | [WorkerInterceptor](https://javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/interceptors/WorkerInterceptor.html) | |
40+
| :workflow-cache-size | To avoid constant replay of code the workflow objects are cached on a worker. This cache is shared by all workers created by the Factory. | int | 600 |
41+
"
42+
43+
{:enable-logging-in-replay #(.setEnableLoggingInReplay ^WorkerFactoryOptions$Builder %1 %2)
44+
:max-workflow-thread-count #(.setMaxWorkflowThreadCount ^WorkerFactoryOptions$Builder %1 %2)
45+
:worker-interceptors #(.setWorkerInterceptors ^WorkerFactoryOptions$Builder %1 (into-array WorkerInterceptor %2))
46+
:workflow-cache-size #(.setWorkflowCacheSize ^WorkerFactoryOptions$Builder %1 %2)})
47+
48+
(defn ^:no-doc worker-factory-options->
49+
^WorkerFactoryOptions [params]
50+
(u/build (WorkerFactoryOptions/newBuilder (WorkerFactoryOptions/getDefaultInstance)) worker-factory-options params))
51+
3052
(def worker-options
3153
"
3254
Options for configuring workers (See [[start]])
@@ -86,19 +108,21 @@ Starts a worker processing loop.
86108
87109
Arguments:
88110
89-
- `client`: WorkflowClient instance returned from [[temporal.client.core/create-client]]
90-
- `options`: Worker start options (See [[worker-options]])
111+
- `client`: WorkflowClient instance returned from [[temporal.client.core/create-client]]
112+
- `options`: Worker start options (See [[worker-options]])
113+
- `factory-options`: WorkerFactory options (See [[worker-factory-options]])
91114
92115
```clojure
93116
(start {:task-queue ::my-queue :ctx {:some \"context\"}})
94117
```
95118
"
96-
[client {:keys [task-queue] :as options}]
97-
(let [factory (WorkerFactory/newInstance client)
98-
worker (.newWorker factory (u/namify task-queue) (worker-options-> options))]
99-
(init worker options)
100-
(.start factory)
101-
{:factory factory :worker worker}))
119+
([client options] (start client options nil))
120+
([client {:keys [task-queue] :as options} factory-options]
121+
(let [factory (WorkerFactory/newInstance client (worker-factory-options-> factory-options))
122+
worker (.newWorker factory (u/namify task-queue) (worker-options-> options))]
123+
(init worker options)
124+
(.start factory)
125+
{:factory factory :worker worker})))
102126

103127
(defn stop
104128
"

src/temporal/testing/env.clj

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
(ns temporal.testing.env
44
"Methods and utilities to assist with unit-testing Temporal workflows"
55
(:require [temporal.client.worker :as worker]
6+
[temporal.client.core :as client]
67
[temporal.internal.utils :as u])
78
(:import [io.temporal.testing TestWorkflowEnvironment TestEnvironmentOptions TestEnvironmentOptions$Builder]))
89

910
(def ^:no-doc test-env-options
10-
{:metrics-scope #(.setMetricsScope ^TestEnvironmentOptions$Builder %1 %2)})
11+
{:worker-factory-options #(.setWorkerFactoryOptions ^TestEnvironmentOptions$Builder %1 (worker/worker-factory-options-> %2))
12+
:workflow-client-options #(.setWorkflowClientOptions ^TestEnvironmentOptions$Builder %1 (client/client-options-> %2))
13+
:workflow-service-stub-options #(.setWorkflowServiceStubsOptions ^TestEnvironmentOptions$Builder %1 (client/stub-options-> %2))
14+
:metrics-scope #(.setMetricsScope ^TestEnvironmentOptions$Builder %1 %2)})
1115

1216
(defn ^:no-doc test-env-options->
1317
^TestEnvironmentOptions [params]
@@ -25,9 +29,13 @@ Arguments:
2529
2630
#### options map
2731
28-
| Value | Description | Type | Default |
29-
| ------------------------- | --------------------------------------------------------------------------- | ------------ | ------- |
30-
| :metrics-scope | The scope to be used for metrics reporting | [Scope](https://github.com/uber-java/tally/blob/master/core/src/main/java/com/uber/m3/tally/Scope.java) | |
32+
| Value | Description | Type | Default |
33+
| ------------------------- | --------------------------------------------- | ------------ | ------- |
34+
| :worker-factory-options | | [[worker/worker-factory-options]] | |
35+
| :workflow-client-options | | [[client/client-options]] | |
36+
| :workflow-service-stub-options | | [[client/stub-options]] | |
37+
| :metrics-scope | The scope to be used for metrics reporting | [Scope](https://github.com/uber-java/tally/blob/master/core/src/main/java/com/uber/m3/tally/Scope.java) | |
38+
3139
3240
"
3341
([]

test/temporal/test/tracing.clj

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
;; Copyright © Manetu, Inc. All rights reserved
2+
3+
(ns temporal.test.tracing
4+
(:require [clojure.test :refer :all]
5+
[taoensso.timbre :as log]
6+
[temporal.client.core :as c]
7+
[temporal.testing.env :as e]
8+
[temporal.workflow :refer [defworkflow]])
9+
(:import [io.temporal.opentracing OpenTracingClientInterceptor OpenTracingWorkerInterceptor]))
10+
11+
;; do not use the shared fixture, since we want to control the env creation
12+
13+
;;-----------------------------------------------------------------------------
14+
;; Data
15+
;;-----------------------------------------------------------------------------
16+
(defonce state (atom {}))
17+
18+
(def task-queue ::default)
19+
20+
;;-----------------------------------------------------------------------------
21+
;; Utilities
22+
;;-----------------------------------------------------------------------------
23+
(defn get-client []
24+
(get @state :client))
25+
26+
(defn create-workflow [workflow]
27+
(c/create-workflow (get-client) workflow {:task-queue task-queue}))
28+
29+
(defn invoke [workflow]
30+
(let [i (create-workflow workflow)]
31+
(c/start i {})
32+
@(c/get-result i)))
33+
34+
;;-----------------------------------------------------------------------------
35+
;; Workflows
36+
;;-----------------------------------------------------------------------------
37+
38+
(defworkflow traced-workflow
39+
[ctx {:keys [args]}]
40+
(log/info "traced-workflow:" args)
41+
:ok)
42+
43+
;;-----------------------------------------------------------------------------
44+
;; Fixtures
45+
;;-----------------------------------------------------------------------------
46+
(defn create-service []
47+
(let [env (e/create {:workflow-client-options {:interceptors [(OpenTracingClientInterceptor.)]}
48+
:worker-factory-options {:worker-interceptors [(OpenTracingWorkerInterceptor.)]}})
49+
client (e/get-client env)]
50+
(e/start env {:task-queue task-queue})
51+
(swap! state assoc
52+
:env env
53+
:client client)))
54+
55+
(defn destroy-service []
56+
(swap! state
57+
(fn [{:keys [env] :as s}]
58+
(e/stop env)
59+
(dissoc s :env :client))))
60+
61+
(defn wrap-service [test-fn]
62+
(create-service)
63+
(test-fn)
64+
(destroy-service))
65+
66+
(use-fixtures :once wrap-service)
67+
68+
;;-----------------------------------------------------------------------------
69+
;; Tests
70+
;;-----------------------------------------------------------------------------
71+
72+
(deftest the-test
73+
(testing "Verifies that we can invoke our traced workflow" ;; todo: verify that tracing is working
74+
(is (-> (invoke traced-workflow) (= :ok)))))

0 commit comments

Comments
 (0)