wilton/Channel
Pipes that connect concurrent threads.
More...
Functions | |
Object | Channel (String name, Number|Undefined size, Function|Undefined callback) |
Create Channel instance. | |
Undefined | close (Function|Undefined callback) |
Close the channel releasing native resources. | |
static String | dumpRegistry (Function|Undefined callback) |
Dump a registry of currently active channels. | |
static Object | lookup (String name, Function|Undefined callback) |
Look for the existing channel. | |
Number | maxSize (Function|Undefined callback) |
Max number of buffered messages. | |
String | name (Function|Undefined callback) |
Name of this channel. | |
Boolean | offer (Object|String msg, Function|Undefined callback) |
Send a message to the channel, return false if channel is full. | |
Object | peek (Function|Undefined callback) |
Copy pending message without removing it from channel. | |
Object | poll (Function|Undefined callback) |
Poll channel for buffered messages. | |
Object | receive (Number|Undefined timeoutMillis, Function|Undefined callback) |
Receive a message from the channel, blocks if channel is empty. | |
Undefined | receiveAndClose (Function|Undefined callback) |
Receive a message and close the channel after that. | |
static Number | select (Array channels, Number|Undefined timeoutMillis, Function|Undefined callback) |
Wait for input data on multiple channels simultaneously. | |
Boolean | send (Object|String msg, Number|Undefined timeoutMillis, Function|Undefined callback) |
Send a message to the channel, blocks if channel is full. | |
Any | synchronize (Function operations, Function|Undefined callback) |
Run specified operations in syncronized mode. |
This module provides "pipes" - FIFO queues - that can be used to transfer data between differrent threads of execution. Data is transfered in JSON format.
Data can be send and received in blocking (send()
and receive()
methods) or non-blocking (offer()
and poll()
methods) modes.
Multiple threads can write to and read from the same Channel
at the same time.
Channel
s can be "bufferred" (with some amount of intemediate storage), or synchronous (without intermediate storage). Only blocking methods can be used on synchronous channels.
Readers can wait for the data on multiple channels simultaneously using select()
function.
Channel
s are similar in nature with golang Channels.
Underlying system resources can be released manually using close()
method, open channels will be closed during the shutdown.
Usage example:
Object Channel::Channel | ( | String | name, |
Number|Undefined | size, | ||
Function|Undefined | callback | ||
) |
Creates a channel instance allocating resources for intermediate storage. If size
parameter is not specified - created synchronous channel.
Created channel can be accessed in other threads using lookup()
function.
name | String unique name to identify this channel |
size | Number|Undefined maximum number of elements allowed for the intermediate storage; storage is not pre-allocated, grows until this number as needed; value 0 creates synchronous channel, default value: 0 |
callback | Function|Undefined callback to receive result or error |
Object
Channel
instance Undefined Channel::close | ( | Function|Undefined | callback | ) |
Closes the channel, all waiting threads are unblocked with negative (for write) and null (for read) result.
callback | Function|Undefined callback to receive result or error |
Undefined
|
static |
Dumps a registry of currently active channels as a string.
callback | Function|Undefined callback to receive result or error |
String
registry dump
|
static |
Look for the existing channel that may be created in this or other thread.
Returned channel instance is different from the original one (as different threads cannot share live JavaScript objects), but is linked (through handle
) to the same native instance (that holds intermediate storage for this channel).
name | String channel name |
callback | Function|Undefined callback to receive result or error |
Object
Channel
instance that corresponds to native channel with a specified name; throws Error
if channel not found Number Channel::maxSize | ( | Function|Undefined | callback | ) |
String Channel::name | ( | Function|Undefined | callback | ) |
Returns name of this channel, channel must be alive (not closed).
callback | Function|Undefined callback to receive result or error |
String
channel name Boolean Channel::offer | ( | Object|String | msg, |
Function|Undefined | callback | ||
) |
Sends a message to the channel in non-blocking mode returning immediately.
Always returns false
on synchronous channel.
msg | Object|String message to send |
callback | Function|Undefined callback to receive result or error |
Boolean
true
if message was sent successfully, false
if channel is full or channel was closed (manually with close()
or automatically on shutdown) Object Channel::peek | ( | Function|Undefined | callback | ) |
Tries to copy a pending buffered message from the channel in a non-blocking mode, returns null
if channel is empty.
Can be used on synchronous channels (producer thread will NOT be unblocked on peek()
).
callback | Function|Undefined callback to receive result or error |
Object
pending message parsed from JSON, null
if channel is empty or channel was closed (manually with close()
or automatically on shutdown) Object Channel::poll | ( | Function|Undefined | callback | ) |
Tries to receive a message from the channel in a non-blocking mode, returns null
if channel is empty.
Always returns null
on synchronous channel.
callback | Function|Undefined callback to receive result or error |
Object
received message parsed from JSON, null
if channel is empty or channel was closed (manually with close()
or automatically on shutdown) Object Channel::receive | ( | Number|Undefined | timeoutMillis, |
Function|Undefined | callback | ||
) |
Receives JSON messages from the channel in blocking mode.
timeoutMillis | Number|Undefined max timeout for waiting, in milliseconds, default value: 0 - inifinite timeout |
callback | Function|Undefined callback to receive result or error |
Object
received message parsed from JSON, null
if channel was closed (manually with close()
or automatically on shutdown) Undefined Channel::receiveAndClose | ( | Function|Undefined | callback | ) |
|
static |
Blocks current thread until one of the specified channels won't become ready for reading data from it.
This function is similar in nature to POSIX select and golang's select, but it support only "ready for read" logic ("ready for write" is not supported).
channels | Array list of channels to wait on |
timeoutMillis | Number|Undefined max timeout for waiting, in milliseconds, default value: 0 - inifinite timeout |
callback | Function|Undefined callback to receive result or error |
Number
index of the channel in specified list, that is ready for reading, -1
on timeout Boolean Channel::send | ( | Object|String | msg, |
Number|Undefined | timeoutMillis, | ||
Function|Undefined | callback | ||
) |
Sends JSON message to the channel in blocking mode.
msg | Object|String message to send |
timeoutMillis | Number|Undefined max timeout for waiting, in milliseconds, default value: 0 - inifinite timeout |
callback | Function|Undefined callback to receive result or error |
Boolean
true
if message was sent successfully, false
if channel was closed (manually with close()
or automatically on shutdown) Any Channel::synchronize | ( | Function | operations, |
Function|Undefined | callback | ||
) |
Run specified operations taking the mutual exclusive lock, making it is impossible for two concurrent invocations (from different threads) of synchronized operations on the same channel to interleave.
Channel instance must be a buffered channel with maxSize
=1
and must be empty.
If Channel is being destroyed (e.g. during shutdown), this method exits without running the operations
function and without throwing an error.
operations | Function function to run in synchronized mode |
callback | Function|Undefined callback to receive result or error |
Any
return value from operations
function