Skip to content

Commit

Permalink
:inject in process fn-map is now :introduce, more accurate and avoids…
Browse files Browse the repository at this point in the history
… confusion with flow/inject
  • Loading branch information
richhickey committed Jan 16, 2025
1 parent 3ee4a34 commit 17b0538
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
22 changes: 11 additions & 11 deletions src/main/clojure/clojure/core/async/flow.clj
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
::flow/report-chan - a core.async chan for reading.'ping' reponses
will show up here, as will any explicit ::flow/report outputs
from :transform/:inject
from :transform/:introduce
::flow/error-chan - a core.async chan for reading. Any (and only)
exceptions thrown anywhere on any thread inside a flow will appear
Expand Down Expand Up @@ -138,7 +138,7 @@
"Given a map of functions (described below), returns a launcher that
creates a process compliant with the process protocol (see the
spi/ProcLauncher doc). The possible entries for process-impl-map
are :describe, :init, :transition, :transform and :inject. This is
are :describe, :init, :transition, :transform and :introduce. This is
the core facility for defining the logic for processes via ordinary
functions.
Expand Down Expand Up @@ -171,7 +171,7 @@
process will no longer be used following that. See the SPI for
details. state' will be the state supplied to subsequent calls.
Exactly one of either :transform or :inject are required.
Exactly one of either :transform or :introduce are required.
:transform - (state in-name msg) -> [state' output]
where output is a map of outid->[msgs*]
Expand All @@ -184,19 +184,19 @@
may never be nil (per core.async channels). state' will be the state
supplied to subsequent calls.
:inject - (state) -> [state' output]
:introduce - (state) -> [state' output]
where output is a map of outid->[msgs*], per :transform
The inject fn is used for sources - proc-impls that inject new data
The introduce fn is used for sources - proc-impls that introduce new data
into the flow by doing I/O with something external to the flow and
feeding that data to its outputs. A proc-impl specifying :inject may not
feeding that data to its outputs. A proc-impl specifying :introduce may not
specify any :ins in its descriptor, as none but the ::flow/control channel
will be read. Instead, inject will be called every time through the
will be read. Instead, introduce will be called every time through the
process loop, and will presumably do blocking or paced I/O to get
new data to return via its outputs. If it does blocking I/O it
should do so with a timeout so it can regularly return to the
process loop which can then look for control messages - it's fine
for inject to return with no output. Do not spin poll in the inject
for introduce to return with no output. Do not spin poll in the introduce
fn.
proc accepts an option map with keys:
Expand All @@ -205,13 +205,13 @@
will be used when getting the return from the future - see below
The :compute context is not allowed for proc impls that
provide :inject (as I/O is presumed).
provide :introduce (as I/O is presumed).
In the :exec context of :mixed or :io, this dictates the type of
thread in which the process loop will run, _including its calls to
transform/inject_.
transform/introduce_.
When :io is specified transform/inject should not do extensive computation.
When :io is specified transform/introduce should not do extensive computation.
When :compute is specified (only allowed for :transform), each call
to transform will be run in a separate thread. The process loop will
Expand Down
14 changes: 7 additions & 7 deletions src/main/clojure/clojure/core/async/flow/impl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@
(loop [nstatus nstatus, nstate nstate, msgs (seq msgs)]
(if (or (nil? msgs) (= nstatus :exit))
[nstatus nstate]
(let [m (if-some [m (first msgs)] m (throw "messages must be non-nil"))
(let [m (if-some [m (first msgs)] m (throw (Exception. "messages must be non-nil")))
[v c] (async/alts!!
[control [outc m]]
:priority true)]
Expand All @@ -216,10 +216,10 @@

(defn proc
"see lib ns for docs"
[{:keys [describe init transition transform inject] :as impl} {:keys [exec compute-timeout-ms]}]
[{:keys [describe init transition transform introduce] :as impl} {:keys [exec compute-timeout-ms]}]
;;validate the preconditions
(assert (= 1 (count (keep identity [transform inject]))) "must provide exactly one of :transform or :inject")
(assert (not (and inject (= exec :compute))) "can't specify :inject and :compute")
(assert (= 1 (count (keep identity [transform introduce]))) "must provide exactly one of :transform or :introduce")
(assert (not (and introduce (= exec :compute))) "can't specify :introduce and :compute")
(reify
clojure.core.protocols/Datafiable
(datafy [_]
Expand All @@ -228,7 +228,7 @@
spi/ProcLauncher
(describe [_]
(let [{:keys [params ins] :as desc} (describe)]
(assert (not (and ins inject)) "can't specify :ins when :inject")
(assert (not (and ins introduce)) "can't specify :ins when :introduce")
(assert (or (not params) init) "must have :init if :params")
desc))
(start [_ {:keys [pid args ins outs resolver]}]
Expand Down Expand Up @@ -261,7 +261,7 @@
;;:running
(let [[msg c] (if transform
(async/alts!! read-chans :priority true)
;;inject
;;introduce
(when-let [msg (async/poll! control)]
[msg control]))
cid (io-id c)]
Expand All @@ -272,7 +272,7 @@
(try
(let [[nstate outputs] (if transform
(transform state cid msg)
(inject state))
(introduce state))
[nstatus nstate]
(send-outputs status nstate outputs outs resolver control handle-command transition)]
[nstatus nstate (inc count)])
Expand Down

0 comments on commit 17b0538

Please sign in to comment.