Clojure

Agents and Asynchronous Actions

Table of Contents

Like Refs, Agents provide shared access to mutable state. Where Refs support coordinated, synchronous change of multiple locations, Agents provide independent, asynchronous change of individual locations. Agents are bound to a single storage location for their lifetime, and only allow mutation of that location (to a new state) to occur as a result of an action. Actions are functions (with, optionally, additional arguments) that are asynchronously applied to an Agent’s state and whose return value becomes the Agent’s new state. Because actions are functions they can also be multimethods and therefore actions are potentially polymorphic. Also, because the set of functions is open, the set of actions supported by an Agent is also open, a sharp contrast to pattern matching message handling loops provided by some other languages.

Clojure’s Agents are reactive, not autonomous - there is no imperative message loop and no blocking receive. The state of an Agent should be itself immutable (preferably an instance of one of Clojure’s persistent collections), and the state of an Agent is always immediately available for reading by any thread (using the deref function or reader macro @) without any messages, i.e. observation does not require cooperation or coordination.

Agent action dispatches take the form (send agent fn args*). send (and send-off) always returns immediately. At some point later, in another thread, the following will happen:

  1. The given fn will be applied to the state of the Agent and the args, if any were supplied.

  2. The return value of fn will be passed to the validator function, if one has been set on the Agent. See set-validator! for details.

  3. If the validator succeeds or if no validator was given, the return value of the given fn will become the new state of the Agent.

  4. If any watchers were added to the Agent, they will be called. See add-watch for details.

  5. If during the function execution any other dispatches are made (directly or indirectly), they will be held until after the state of the Agent has been changed.

If any exceptions are thrown by an action function, no nested dispatches will occur, and the exception will be cached in the Agent itself. When an Agent has errors cached, any subsequent interactions will immediately throw an exception, until the agent’s errors are cleared. Agent errors can be examined with agent-error and the agent restarted with restart-agent.

The actions of all Agents get interleaved amongst threads in a thread pool. At any point in time, at most one action for each Agent is being executed. Actions dispatched to an agent from another single agent or thread will occur in the order they were sent, potentially interleaved with actions dispatched to the same agent from other sources. send should be used for actions that are CPU limited, while send-off is appropriate for actions that may block on IO.

Agents are integrated with the STM - any dispatches made in a transaction are held until it commits, and are discarded if it is retried or aborted.

As with all of Clojure’s concurrency support, no user-code locking is involved.

Note that use of Agents starts a pool of non-daemon background threads that will prevent shutdown of the JVM. Use shutdown-agents to terminate these threads and allow shutdown.

Example

This example is an implementation of the send-a-message-around-a-ring test. A chain of m agents is created, then a sequence of n actions are dispatched to the head of the chain and relayed through it.

(defn relay [x i]
  (when (:next x)
    (send (:next x) relay i))
  (when (and (zero? i) (:report-queue x))
    (.put (:report-queue x) i))
  x)

(defn run [m n]
  (let [q (new java.util.concurrent.SynchronousQueue)
        hd (reduce (fn [next _] (agent {:next next}))
                   (agent {:report-queue q}) (range (dec m)))]
    (doseq [i (reverse (range n))]
      (send hd relay i))
    (.take q)))

; 1 million message sends:
(time (run 1000 1000))
->"Elapsed time: 2959.254 msecs"

Create an Agent: agent

Examine an Agent: deref (see also the @ reader macro) agent-error error-handler error-mode

Change Agent state: send send-off restart-agent

Block waiting for an Agent: await await-for

Ref validators: set-validator! get-validator

Agent thread management: shutdown-agents