wilton.js  v202103141
 All Namespaces Functions
Functions
Channel Namespace Reference

wilton/Channel
Pipes that connect concurrent threads. More...

Functions

Object Channel (String name, Number|Undefined size, Function|Undefined callback)
 Create Channel instance.
Undefined close (Function|Undefined callback)
 Close the channel releasing native resources.
static String dumpRegistry (Function|Undefined callback)
 Dump a registry of currently active channels.
static Object lookup (String name, Function|Undefined callback)
 Look for the existing channel.
Number maxSize (Function|Undefined callback)
 Max number of buffered messages.
String name (Function|Undefined callback)
 Name of this channel.
Boolean offer (Object|String msg, Function|Undefined callback)
 Send a message to the channel, return false if channel is full.
Object peek (Function|Undefined callback)
 Copy pending message without removing it from channel.
Object poll (Function|Undefined callback)
 Poll channel for buffered messages.
Object receive (Number|Undefined timeoutMillis, Function|Undefined callback)
 Receive a message from the channel, blocks if channel is empty.
Undefined receiveAndClose (Function|Undefined callback)
 Receive a message and close the channel after that.
static Number select (Array channels, Number|Undefined timeoutMillis, Function|Undefined callback)
 Wait for input data on multiple channels simultaneously.
Boolean send (Object|String msg, Number|Undefined timeoutMillis, Function|Undefined callback)
 Send a message to the channel, blocks if channel is full.
Any synchronize (Function operations, Function|Undefined callback)
 Run specified operations in syncronized mode.

Detailed Description

This module provides "pipes" - FIFO queues - that can be used to transfer data between differrent threads of execution. Data is transfered in JSON format.

Data can be send and received in blocking (send() and receive() methods) or non-blocking (offer() and poll() methods) modes.

Multiple threads can write to and read from the same Channel at the same time.

Channels can be "bufferred" (with some amount of intemediate storage), or synchronous (without intermediate storage). Only blocking methods can be used on synchronous channels.

Readers can wait for the data on multiple channels simultaneously using select() function.

Channels are similar in nature with golang Channels.

Underlying system resources can be released manually using close() method, open channels will be closed during the shutdown.

Usage example:

// thread 1
// create buffered channel
var chan = new Channel("channel1", 1024);
// create sync channel
var sync = new Channel("channel2");
// send message, blocks is channel is full
chan.send({
foo: 42
});
// offer message, returns false if channel is full
chan.offer({
bar: 43
});
// thread 2
// lookup existing channels from other thread
var chan = Channel.lookup("channel1");
var sync = Channel.lookup("channel2");
// receive message, blocks if channel is empty
var obj1 = chan.receive();
// poll for message, returns null if channel is empty
var obj2 = chan.poll();
// copy pending message without removing it from channel
var obj3 = chan.peek();
// wait on multiple channels, returns the index of channel that has data ready
var idx = Channel.select([chan, sync], 10000);
// close channels
chan.close();
sync.close();

Function Documentation

Object Channel::Channel ( String  name,
Number|Undefined  size,
Function|Undefined  callback 
)

Creates a channel instance allocating resources for intermediate storage. If size parameter is not specified - created synchronous channel.

Created channel can be accessed in other threads using lookup() function.

Parameters
nameString unique name to identify this channel
sizeNumber|Undefined maximum number of elements allowed for the intermediate storage; storage is not pre-allocated, grows until this number as needed; value 0 creates synchronous channel, default value: 0
callbackFunction|Undefined callback to receive result or error
Returns
Object Channel instance
Undefined Channel::close ( Function|Undefined  callback)

Closes the channel, all waiting threads are unblocked with negative (for write) and null (for read) result.

Parameters
callbackFunction|Undefined callback to receive result or error
Returns
Undefined
static String Channel::dumpRegistry ( Function|Undefined  callback)
static

Dumps a registry of currently active channels as a string.

Parameters
callbackFunction|Undefined callback to receive result or error
Returns
String registry dump
static Object Channel::lookup ( String  name,
Function|Undefined  callback 
)
static

Look for the existing channel that may be created in this or other thread.

Returned channel instance is different from the original one (as different threads cannot share live JavaScript objects), but is linked (through handle) to the same native instance (that holds intermediate storage for this channel).

Parameters
nameString channel name
callbackFunction|Undefined callback to receive result or error
Returns
Object Channel instance that corresponds to native channel with a specified name; throws Error if channel not found
Number Channel::maxSize ( Function|Undefined  callback)

Returns buffer size for buffered Channel and 0 for sycnhronous.

Parameters
callbackFunction|Undefined callback to receive result or error
Returns
Number buffer size for buffered Channel and 0 for sycnhronous
String Channel::name ( Function|Undefined  callback)

Returns name of this channel, channel must be alive (not closed).

Parameters
callbackFunction|Undefined callback to receive result or error
Returns
String channel name
Boolean Channel::offer ( Object|String  msg,
Function|Undefined  callback 
)

Sends a message to the channel in non-blocking mode returning immediately.

Always returns false on synchronous channel.

Parameters
msgObject|String message to send
callbackFunction|Undefined callback to receive result or error
Returns
Boolean true if message was sent successfully, false if channel is full or channel was closed (manually with close() or automatically on shutdown)
Object Channel::peek ( Function|Undefined  callback)

Tries to copy a pending buffered message from the channel in a non-blocking mode, returns null if channel is empty.

Can be used on synchronous channels (producer thread will NOT be unblocked on peek()).

Parameters
callbackFunction|Undefined callback to receive result or error
Returns
Object pending message parsed from JSON, null if channel is empty or channel was closed (manually with close() or automatically on shutdown)
Object Channel::poll ( Function|Undefined  callback)

Tries to receive a message from the channel in a non-blocking mode, returns null if channel is empty.

Always returns null on synchronous channel.

Parameters
callbackFunction|Undefined callback to receive result or error
Returns
Object received message parsed from JSON, null if channel is empty or channel was closed (manually with close() or automatically on shutdown)
Object Channel::receive ( Number|Undefined  timeoutMillis,
Function|Undefined  callback 
)

Receives JSON messages from the channel in blocking mode.

Parameters
timeoutMillisNumber|Undefined max timeout for waiting, in milliseconds, default value: 0 - inifinite timeout
callbackFunction|Undefined callback to receive result or error
Returns
Object received message parsed from JSON, null if channel was closed (manually with close() or automatically on shutdown)
Undefined Channel::receiveAndClose ( Function|Undefined  callback)

Shortcut method to perform receive() and close() in one go. Received message is discarded. Equivalent to:

chan.receive();
chan.close();
Parameters
callbackFunction|Undefined callback to receive result or error
Returns
Undefined
static Number Channel::select ( Array  channels,
Number|Undefined  timeoutMillis,
Function|Undefined  callback 
)
static

Blocks current thread until one of the specified channels won't become ready for reading data from it.

This function is similar in nature to POSIX select and golang's select, but it support only "ready for read" logic ("ready for write" is not supported).

Parameters
channelsArray list of channels to wait on
timeoutMillisNumber|Undefined max timeout for waiting, in milliseconds, default value: 0 - inifinite timeout
callbackFunction|Undefined callback to receive result or error
Returns
Number index of the channel in specified list, that is ready for reading, -1 on timeout
Boolean Channel::send ( Object|String  msg,
Number|Undefined  timeoutMillis,
Function|Undefined  callback 
)

Sends JSON message to the channel in blocking mode.

Parameters
msgObject|String message to send
timeoutMillisNumber|Undefined max timeout for waiting, in milliseconds, default value: 0 - inifinite timeout
callbackFunction|Undefined callback to receive result or error
Returns
Boolean true if message was sent successfully, false if channel was closed (manually with close() or automatically on shutdown)
Any Channel::synchronize ( Function  operations,
Function|Undefined  callback 
)

Run specified operations taking the mutual exclusive lock, making it is impossible for two concurrent invocations (from different threads) of synchronized operations on the same channel to interleave.

Channel instance must be a buffered channel with maxSize=1 and must be empty.

If Channel is being destroyed (e.g. during shutdown), this method exits without running the operations function and without throwing an error.

Parameters
operationsFunction function to run in synchronized mode
callbackFunction|Undefined callback to receive result or error
Returns
Any return value from operations function