diff --git a/src/main/clojure/clojure/core/async/flow.clj b/src/main/clojure/clojure/core/async/flow.clj index c1b8833..3beb519 100644 --- a/src/main/clojure/clojure/core/async/flow.clj +++ b/src/main/clojure/clojure/core/async/flow.clj @@ -83,7 +83,7 @@ ::flow/report-chan - a core.async chan for reading.'ping' reponses will show up here, as will any explicit ::flow/report outputs - from :transform/:inject + from :transform/:introduce ::flow/error-chan - a core.async chan for reading. Any (and only) exceptions thrown anywhere on any thread inside a flow will appear @@ -138,7 +138,7 @@ "Given a map of functions (described below), returns a launcher that creates a process compliant with the process protocol (see the spi/ProcLauncher doc). The possible entries for process-impl-map - are :describe, :init, :transition, :transform and :inject. This is + are :describe, :init, :transition, :transform and :introduce. This is the core facility for defining the logic for processes via ordinary functions. @@ -171,7 +171,7 @@ process will no longer be used following that. See the SPI for details. state' will be the state supplied to subsequent calls. - Exactly one of either :transform or :inject are required. + Exactly one of either :transform or :introduce are required. :transform - (state in-name msg) -> [state' output] where output is a map of outid->[msgs*] @@ -184,19 +184,19 @@ may never be nil (per core.async channels). state' will be the state supplied to subsequent calls. - :inject - (state) -> [state' output] + :introduce - (state) -> [state' output] where output is a map of outid->[msgs*], per :transform - The inject fn is used for sources - proc-impls that inject new data + The introduce fn is used for sources - proc-impls that introduce new data into the flow by doing I/O with something external to the flow and - feeding that data to its outputs. A proc-impl specifying :inject may not + feeding that data to its outputs. A proc-impl specifying :introduce may not specify any :ins in its descriptor, as none but the ::flow/control channel - will be read. Instead, inject will be called every time through the + will be read. Instead, introduce will be called every time through the process loop, and will presumably do blocking or paced I/O to get new data to return via its outputs. If it does blocking I/O it should do so with a timeout so it can regularly return to the process loop which can then look for control messages - it's fine - for inject to return with no output. Do not spin poll in the inject + for introduce to return with no output. Do not spin poll in the introduce fn. proc accepts an option map with keys: @@ -205,13 +205,13 @@ will be used when getting the return from the future - see below The :compute context is not allowed for proc impls that - provide :inject (as I/O is presumed). + provide :introduce (as I/O is presumed). In the :exec context of :mixed or :io, this dictates the type of thread in which the process loop will run, _including its calls to - transform/inject_. + transform/introduce_. - When :io is specified transform/inject should not do extensive computation. + When :io is specified transform/introduce should not do extensive computation. When :compute is specified (only allowed for :transform), each call to transform will be run in a separate thread. The process loop will diff --git a/src/main/clojure/clojure/core/async/flow/impl.clj b/src/main/clojure/clojure/core/async/flow/impl.clj index 8332b68..fdbf4db 100644 --- a/src/main/clojure/clojure/core/async/flow/impl.clj +++ b/src/main/clojure/clojure/core/async/flow/impl.clj @@ -202,7 +202,7 @@ (loop [nstatus nstatus, nstate nstate, msgs (seq msgs)] (if (or (nil? msgs) (= nstatus :exit)) [nstatus nstate] - (let [m (if-some [m (first msgs)] m (throw "messages must be non-nil")) + (let [m (if-some [m (first msgs)] m (throw (Exception. "messages must be non-nil"))) [v c] (async/alts!! [control [outc m]] :priority true)] @@ -216,10 +216,10 @@ (defn proc "see lib ns for docs" - [{:keys [describe init transition transform inject] :as impl} {:keys [exec compute-timeout-ms]}] + [{:keys [describe init transition transform introduce] :as impl} {:keys [exec compute-timeout-ms]}] ;;validate the preconditions - (assert (= 1 (count (keep identity [transform inject]))) "must provide exactly one of :transform or :inject") - (assert (not (and inject (= exec :compute))) "can't specify :inject and :compute") + (assert (= 1 (count (keep identity [transform introduce]))) "must provide exactly one of :transform or :introduce") + (assert (not (and introduce (= exec :compute))) "can't specify :introduce and :compute") (reify clojure.core.protocols/Datafiable (datafy [_] @@ -228,7 +228,7 @@ spi/ProcLauncher (describe [_] (let [{:keys [params ins] :as desc} (describe)] - (assert (not (and ins inject)) "can't specify :ins when :inject") + (assert (not (and ins introduce)) "can't specify :ins when :introduce") (assert (or (not params) init) "must have :init if :params") desc)) (start [_ {:keys [pid args ins outs resolver]}] @@ -261,7 +261,7 @@ ;;:running (let [[msg c] (if transform (async/alts!! read-chans :priority true) - ;;inject + ;;introduce (when-let [msg (async/poll! control)] [msg control])) cid (io-id c)] @@ -272,7 +272,7 @@ (try (let [[nstate outputs] (if transform (transform state cid msg) - (inject state)) + (introduce state)) [nstatus nstate] (send-outputs status nstate outputs outs resolver control handle-command transition)] [nstatus nstate (inc count)])