The core.async library adds support for asynchronous programming using channels.
Channels are queues that carry values and support multiple writers and readers. Channels are created with
chan. Values in a channel are stored in a buffer. Buffers are never unbounded and there are several provided buffer types:
(chan) - no buffer is used, and a rendezvous is required to pass a value through the channel from writer to reader
Fixed size -
(chan (dropping-buffer 10)) - fixed size, and when full drop newest value
(chan (sliding-buffer 10)) - fixed size, and when full drop oldest value
Channels are first-class values that can be passed around like any other value.
Channels may optionally be supplied with a transducer and an exception handler. The transducer will be applied to values that pass through the channel. If a transducer is supplied, the channel must be buffered (transducers can create intermediate values that must be stored somewhere).
ex-handler is a function of one argument (a Throwable). If an exception occurs while applying the transducer, the
ex-handler will be invoked, and any non-nil return value will be placed in the channel. If no
ex-handler is supplied, exceptions will flow and be handled where they occur (note that this may in either the writer or reader thread depending on the operation and the state of the buffer).
Any value can be placed on a channel, except
nil. The primary operations on channels are put and take, which are provided in several variants:
As a mnemonic, the
> points in the direction the value travels relative to the channel arg. For example, in
(>!! chan val) the
> points into the channel (put) and
(<!! chan) points out of the channel (take).
The use case dictates the variant to use. Parking operations are only valid in
go blocks (see below for more) and never valid outside the lexical scope of a
go. Conversely, blocking operations should only be used outside
The async and non-blocking forms are less common but may be used in either context. Use the async variants to specify a channel and a function that is called when the take or put succeeds. The
put! functions also take an optional flag
on-caller? to indicate whether the fn can be called on the current thread. The non-blocking
poll! will either complete or return immediately.
Channels are closed with
close!. When a channel is closed, no values may be added, but values already in the channel may be taken. When all values are drained from a closed channel, take operations will return
nil (these are not valid values and serve as a marker).
alts! (parking) and
alts!! (blocking) can be used to wait on a set of channel operations until one succeeds. Channel operations can be either a put (with a value) or a take. By default, if more than one operation becomes available, they are chosen in random order, but set
:priority true to order a preference. Only one of the operations will occur. If no operation is available and a
:default val is specified, the default value will be returned instead.
Since it is common to combine an
alts with a conditional return based on the action chosen,
alt! (parking) and
alt!! (blocking) combine an
alts! select with destructuring of the channel and value and a result expression.
Promise channels are special channels that will accept only a single value. Once a value is put to a promise channel, all pending and future consumers will receive only that value. Future puts complete but drop the value. When the channel is closed, consumers will receive either the value (if a put occurred) or nil (if no put occurred) forever.
Promise channels: promise-chan
"Processes", in the most general sense, are represented either as go blocks or threads. Go blocks model a lightweight computation that can be "parked" (paused) without consuming a thread. Go blocks communicate externally via channels. Any core.async parking operation (
alts!) that cannot be immediately completed will cause the block to park and it will be automatically resumed when the operation can complete (when data has arrived on a channel to allow it).
Note that go blocks are multiplexed over a finite number of threads and should never be blocked, either by the use of a core.async blocking operation (like
<!!) or by calling a blockable I/O operation like a network call. Doing so may effectively block all of the threads in the go block pool and prevent further progress.
core.async provides the helper functions
thread-call (analogous to
future-call) to execute a process asynchronously in a separate thread. As these threads are not limited, they are suitable for blocking operations and can communicate with other processes via channels. However, note that these threads are not special - you can create and manage your own threads in any way you like and use core.async channels from those threads to communicate.
pipeline function (and variants) are designed for modeling your work as a pipeline of multi-threaded processing stages. The stages are connected by channels and each stage has N threads performing transducer xf as values flow from the from channel to the to channel. The variants are:
pipeline - the work performed in the xf must not block (designed for computational parallelism). The transducer will be applied independently to each value, in parallel, so stateful transducer functions will likely not be useful.
pipeline-blocking - the work performed in the xf may block, for example on network operations.
pipeline-async - this variant triggers asynchronous work in another system or thread and expects another thread to place the results on a return channel.
go blocks are dispatched over an internal fixed size thread pool, which defaults to 8 threads. The size of this pool can be modified using the Java system property
Set the Java system property
clojure.core.async.go-checking to true to validate
go blocks do not invoke core.async blocking operations. Property is read once, at namespace load time. Recommended for use primarily during development. Invalid blocking calls will throw in go block threads - use
Thread.setDefaultUncaughtExceptionHandler() to catch and handle such exceptions.
Because the core.async go block thread pool is fixed size, blocking IO operations should never be done in go blocks. If all go threads are blocked on blocking operations, you may experience either deadlock or lack of progress.
One common issue is the use of core.async blocking operations inside go blocks. core.async includes a debugging facility to detect this situation (other kinds of blocking operation cannot be detected so this covers only part of the problem). To enable go checking, set the Java system property
clojure.core.async.go-checking=true. This property is read once, at namespace load time, and should be used in development or testing, not in production.
When go checking is active, invalid blocking calls in a go block will throw in go block threads. By default, these will likely throw to the go block thread’s uncaught exception handler and be printed, but you can use
Thread/setDefaultUncaughtExceptionHandler to change the default behavior (or depending on your system, you may have one already that routes to logging).