How to write a NeuroPype node

A walkthrough guide on how to write your own nodes for Neuropype.

Architecture

A NeuroPype node has the following components:

  1. One or more DataPorts which connect the node to the previous and next nodes in the chain and received/send the signal data.
  2. Optionally, additional Ports which define the node’s properties and which hold the parameters that can be configured by the user (or programmatically, or by other nodes).
  3. The programming logic/algorithm that is executed on the data passing through the node.

To illustrate this, we’ll walk through the code of the Rereferencing.py node, which can be found in the Examples folder of your install. Unless otherwise specified , the code extracts below are taken from that node. (We’ll also use code from some other nodes to illustrate other use cases.)

Creating a file for the node

NeuroPype nodes are written in Python. In NeuroPype, each node is defined in its own .py file, and the file name needs to match the class name of the node that you're going to define (e.g., Rereferencing.py). This file generally goes into one of the Python packages inside the neuropype/nodes/ folder (e.g., neuropype.nodes.signal_processing). You can also create your own package folder if the existing categories are not a good fit for your node, which is explained in more detail at the bottom of this page.

(Note: On Windows 10, if you plan on adding your own nodes to your Neuropype installation, we recommend installing the Neuropype Suite outside the Program Files folder in order to have write access to the neuropype/nodes folder.)

Imports

All nodes should at least import the engine module:

from ...engine import *

Most nodes also make use of numpy, and may import other libraries as needed for the code:

import numpy as np
from ...utilities.helpers import parse_range

Tip: if you value rapid startup time and low memory footprint, we recommend that any imports besides numpy, the engine, or Python standard library packages, be imported only at the time when they're needed (there is no harm having imports in a function that gets called multiple times).

Class definition

All nodes inherit from the Neuropype Node class (in neuropype.engine.node). While you can define a whole class hierarchy inheriting from Node, we recommend keeping it simple and having one simple subclass per Node if possible, since otherwise the logic at play inside a node can very quickly get hard to reason about. For additional information, we recommend having a look at the reference documentation of the Node base class to see what features it offers and how it is meant to be used.

class Rereferencing(Node):
    """Subtract the average of some reference data."""

In/out data flow

Next, you need to define the node’s DataPort(s).

DataPorts are used to control the flow of data between nodes. A port can be defined as IN (receiving data from the previous node) or OUT (sending data to the next node), or INOUT (both receiving and sending data). The default, if unspecified, is INOUT. A node that is used at the beginning of a chain (such as a node that imports a file or datastream) would have a DataPort that is OUT only, and the DataPort for a node at the end of the chain (such as a node writing to a file or device) would be IN only. For most intermediate processing nodes, having a single INOUT data port is the simplest and easiest to work with, but more complex setups (e.g., forking and merging nodes) can be set up.

DataPorts are only used for transferring data between nodes within a pipeline, transferring data to and from external inputs/outputs (filesystem, LSL stream, etc.) should be handled programmatically in the node. (For an example of this, see the ImportCSV.py node)

# Rereferencing.py

data = DataPort(Packet, "Data to process.")

Most nodes will have a single DataPort that is INOUT. But nodes can have multiple DataPorts. In this example node that implements a case switch for routing data to different branches of a pipeline, we define one DataPort for the data coming in, and n DataPorts for data going out. We then route the data to the proper OUT node through a setter method that is called when the value of the input node changes.

# ConditionSwitch.py

input = DataPort(object, "Input condition.", IN, setifequal=False)

value1 = Port(None, object, """Value to compare to. Usually a scalar.""")
out1 = DataPort(bool, "Executed if input matches the value.", OUT)
value2 = Port(None, object, """Value to compare to. Usually a scalar.""")
out2 = DataPort(bool, "Executed if input matches the value.", OUT)
value3 = Port(None, object, """Value to compare to. Usually a scalar.""")
out3 = DataPort(bool, "Executed if input matches the value.", OUT)
out_default = DataPort(bool, """Executed if input matches none of the values.""", OUT)

In other circumstances, you might have multiple IN DataPorts, which allows the node to receive data from multiple other nodes. In this example, we define several IN DataPorts and one OUT DataPort, and then node programmatically merges the incoming data into a single outgoing stream.

# MergeStreams.py

  data1 = DataPort(Packet, "Input data 1.", IN)
  data2 = DataPort(Packet, "Input data 2.", IN, mutating=False)
  data3 = DataPort(Packet, "Input data 3.", IN, mutating=False)
  data4 = DataPort(Packet, "Input data 4.", IN, mutating=False)
  data5 = DataPort(Packet, "Input data 5.", IN, mutating=False)

  outdata = DataPort(Packet, "Data to process.", OUT)

For more information and non-standard use cases, have a look at the DataPort class. You will see that the port is doing a few things on your behalf, but if you are deviating too much from the trodden path these things may surprise you if you're not aware of them.

Node properties

Property ports are used to define the properties of the node which can be set by the user through the API (or using a front end that communicates with Neuropype through the API, such as Pipeline Designer), and/or provided by another node wired to this node. These can also be defined as IN, OUT, or INOUT (although usually they're all INOUT), take a default value, be “editable” in a GUI, be required, etc.

A Port is like a Python property, but with some additional sugar layered on top, incuding help text, type, and some useful default behavior that makes your life easier (we hope!).

A port's main arguments are:

  • default value
  • value_type ('string', 'int', etc.) [in EnumPort this is replaced by a set of possible values]
  • help text (description)
  • direction (IN/OUT/INOUT)

For the full list of arguments and keyword arguments, see the Port class.

There are different Port subclasses representing the type of data the property is set to: FloatPort, ListPort, StringPort, etc. There is also a special EnumPort class that only accepts a value belonging to the set defined in the port instance.

A port automatically validates that the type of the value that is passed matches the port class (ie, IntPort accepts int values only) and returns an error if the type is incorrect. (However, when accessing values through the NeuroPype REST API, they are returned as string representations, so a bool value would be returned as "True" or "False", a float as "6.5" etc.)

# Rereferencing.py

axis = EnumPort("space", tuple(axis_names), """Axis along which to take the
    reference or baseline.""", verbose_name='select reference along axis')
reference_range = Port(":", object, """Data range to use as reference /
    baseline. When the space axis is used (standard re-referencing), this is
    typically a channel range, e.g. ':' or ['TP8', 'TP9'] or 'Chn1':'Chn10',
    and the reference unit is set to 'names'. When the time axis is used
    (baseline removal), this is typically the baseline time window in
    seconds, e.g., -0.2:0.0.""", verbose_name='reference selection')
reference_unit = EnumPort("names", tuple(axis_units), """Unit in which the
    reference range is given. Depending on the axis, different units are
    applicable, e.g., names for space, seconds for time, indices for
    any axis.""", verbose_name='unit of measurement for selection')
estimator = EnumPort("mean", ("mean", "median", "trim_mean", "min",
                              "max"), """Estimator to use. Mean is the 
    standard choice, median is a robust alternative (tolerates, e.g., bad 
    channels) and trim_mean is a robust alternative which does not inlcude 
    the outliers in the mean calculation.""", verbose_name='use estimator')
cut_prop = FloatPort(0.1, None, """Fractiont of the outliers to cut
    off for trim_mean option.
    """, verbose_name='propotion cutoff for trim mean option')

Class constructor

A node’s __init__ method usually just calls the parent class' __init__ method and creates a new node instance with the Port properties defined by the node's class (as explained above).

# Rereferencing.py

def __init__(self, **kwargs):
    """Create a new node. Accepts initial values for the ports."""
    super().__init__(**kwargs)

If your node maintains some internal state (e.g., if it has to seamlessly process successive chunks of streaming data), you would typically initialize any fields you need in the constructor. It is also common to instead have a method to reset the state of your node, and to just call that in the constructor.

# MovingAverage.py

def __init__(self, **kwargs):
    """Create a new node. Accepts initial values for the ports."""
    self._reset_states()
    super().__init__(**kwargs)

def _reset_states(self):
    """Reset the current filter kernels and filter states."""
    self._buffer = {}  # a ring buffer that holds the previous order samples
    self._index = {}   # index into the ring buffer for next update
    self._accum = {}   # current running sum of ring buffer contents

Performing operations on node properties

You may need to perform certain operations on a port's properties when they are created or updated, such as additional validation or manipulation. This can be done by overwriting the port’s default setter method, which is invoked when data is written to the property when the node is instantiated or when the property is set through the API or by another node.

In this example, we use parse_range to parse the reference_range property in order to catch an error (handled outside the node by a global error catcher) and notify the user if the value being written to the property is incorrect.

# Rereferencing.py

@reference_range.setter
def reference_range(self, v):
    parse_range(v, globals=globals())
    self._reference_range = v

Note how in the above example the value v is assigned to self._reference_range -- this is the default behavior of any port's setter, and the default getter will in turn read from that field, unless you override its code, too.

We can also override a node property’s getter method in order to manipulate the data (reformat, etc.) when accessing it from the main processing method, or another method in the class, or if the property is accessed through the REST API. However, this is rarely done for configuration properties, and the Rereferencing node is an exception in this regard.

In this example, we convert the axis name, when it is written (@axis.setter) and read (@axis.getter), between a string and a different internal data type that simplifies our data processing elsewhere in the node (although we could have skipped that and made the conversion on the fly whenever the internal form of the value would be needed). It is recommended to return the original data type in the getter to avoid confusion when an external system queries the port's values and suddenly gets a different data type back.

# Rereferencing.py 

@axis.setter
def axis(self, value):
    self._axis = axis_definers[value]
    self.signal_changed(True)

@axis.getter
def axis(self):
    return self._axis.tag()

Main node logic

In a sense, the internal behavior of your node is very simple: data flow through your node pretty much happens just by values being assigned to your input ports, and results being read out of your output ports. You take care of all the processing in the setters and/or getters (or at least kick it off from there).

The main logic that processes the signal data coming through the node will go in the setter function for the DataPort transporting the data. Typically this DataPort is simply called data, as we saw earlier. This setter function (@data.setter) has two arguments: the object itself, and the data coming into the object (conventionally referred to as v for value).

# Rereferencing.py

@data.setter
def data(self, v):
    # do your processing here

After we have finished processing the data, we write the result into the node's ._data property, from where it will be read when the data port is accessed (as happens when the output data is transferred out of the node, using the default getter on your data port).

    self._data = v

In this case, the DataPort is INOUT (the default), so the data is being written to self._data for later pickup.

For advanced users: if we had an IN DataPort and an OUT DataPort of different name, we would store the data in the field that's ready by that OUT DataPort instead. For example, we had an IN DataPort called indata and an OUT DataPort call outdata, we would define the @indata.setter function and end it with outdata._data = v. However, we recommend keeping it as simple as possible to avoid potential pitfalls: consider what happens if v was None. It turns out that, by default, the DataPort will helpfully skip your setter in that case and write None directly into self._data, so that you don't have to handle this case yourself (this is because the DataPort has its bypassnone flag set to True). But this means that, when v becomes None, your _outdata value will not be set to None but instead keep its old value, and your node keeps outputting the last value, which is almost certainly not what you want. One solution would be to set that flag to False when declaring the indata port. This is a good reminder to be aware that Port is doing some extra work that can be controlled by flags, and what that work looks like. It's helpful to become familiar with the Port class.

Data structures transmitted between nodes

NeuroPype allows any data type to be transmitted between processing nodes. However, the system is most effective when a unified data structure is used to represent the data items in the main data flow ( i.e., the data that shall be processed). The data flow along an "edge" (connection between two nodes) of the flow graph can be pictured as a series of packets that are generated by one node, and then accepted and processed by another node. The Packet class defines this structure.

At the core of this data structure is an n-way tensor (the Block data structure, defined in the Block class) that can hold any type of array-shaped data. The key feature of a Block is that it does not just hold the raw numbers, but also the axes that describe the meaning of the tensor dimensions (e.g., time, space, frequency, etc.). There are many types of axis classes which can store any kind of per-axis metadata, such as the time points and units of each tensor coordinate, but they all adhere to a unified interface, which allows them to be managed in a Block.

Since we need not just per-axis metadata, but also meta-data about the entire Block (e.g., where it comes from and what it represents), the block is wrapped in a slim wrapper class called Chunk. A Chunk simply adds a property dictionary (actually a property tree, although properties are rarely nested), to a block.

A series of successive Chunks sent or received by a node along an edge can then be thought of as a data “stream” within the pipeline, e.g., a real-time signal coming from a sensor, or an entire array of recorded data loaded from a file. In order to allow single edge (connection) to transmit not just one stream but arbitrarily many parallel streams (e.g., data from multiple devices, including data that are not necessarily clocked at the same rate and therefore can't just be multiplexed into simply more channels), the Packet data structure holds an array of named Chunks – one for each stream. Thus, when processing real-time data streams, a single packet can be thought of as a snapshot of data chunks from multiple streams that all come from the same time period.

For some nodes it make sense to transmit other data structures, for instance simple built-in Python types, but keep in mind that you will want to have maximum interoperability between nodes, so if you invent a custom data structure, it will initially only be understood by a handful of your own nodes.

Processing the data

Now that we understand how the data passing through a node is structured, and we have defined the ports to receive and send the data, as well as the setter function that is triggered when the data is received by the node (typically @data.setter assuming an IN DataPort named data), we can add the "meat" of our node, which is the programming or algorithmic code that processes the signal processing itself.

The typical pattern for processing nodes is to override the data port setter, and to iterate over any non-empty stream chunks in Packet v that match the desired criteria. Uusally any chunk qualifies that has the right list of axes, and in some cases some additional meta-data flags (.props of the Chunk) need to be set in a certain way. Here is a typical example:

# Rereferencing.py

# do your processing here
for _, chunk in enumerate_chunks(v, nonempty=True,
                                 with_axes=self._axis):

One example of a flag is that we can add allow_markers=False as an argument to the enumerate_chunks function to avoid iterating over marker streams (these are quite different from a typical channels x samples time-series chunk, and can easily throw off our processing code if not filtered out) (See enumerate_chunks arguments and methods.)

Once we have the chunk, another common pattern in NeuroPype nodes is to use the bracket operator on its contained block (which calls get_item) to reorder the axes in the desired way; this will result in a "view" into the underlying array, that is, writing into the view will change the original data (in this case, v).

    view = chunk.block[self._axis, ...]

After having reordered the axes to our liking, we will be able to perform operations where we can presuppose specific positions of the axes that we're interested in (in this example, the first axis is the one over which we will average).

In this example node, next we parse the selection range and unit specified by the user by calling the parse_range helper function. This function is quite versatile and understands a number of range syntaxes. You'll recall that reference_range and reference_unit are Ports defined earlier in this node.

    sel = parse_range(self.reference_range, self.reference_unit)

Once we have parsed the range to be processed into a standard Python slice object, we can use the axis[range] expression below to perform the actual selection. To understand how this works, see the Block class data structure, and specifically the __get_item__ method (called with [] as mentioned earlier):

    seldata = chunk.block[self._axis[sel, self.reference_unit], ...]

Now that our data axes are arranged to our liking, and we've selected the range to operate on, we can do the averaging (the purpose of this node), depending on the user-defined setting of the estimator EnumPort defined earlier.

    if self.estimator == 'mean':
        ref = np.mean(seldata.data, axis=0)
    elif self.estimator == 'min':
        ref = np.min(seldata.data, axis=0)
    elif self.estimator == 'max':
        ref = np.max(seldata.data, axis=0)
    elif self.estimator == 'median':
        ref = np.median(seldata.data, axis=0)
    elif self.estimator == 'trim_mean':
        from scipy.stats import trim_mean
        ref = trim_mean(seldata.data, self.cut_prop, axis=0)
    else:
        raise RuntimeError("unrecognized re-referencing estimator.")

Then we subtract the average from the data, and by writing the result back into the view object defined earlier, we modify the data packet v. (Remember that we are running this for each chunk in v, where a chuck is represented by view.)

    view.data = view.data - ref.data

Lastly, as we mentioned earlier, when we're done processing all of the data chunks, we write the results of v to the node's data property:

self._data = v

We're done!

Nodes that maintain internal state

If your node maintains internal state, at the very least you'll have to ensure that you're clearing it at the right time (namely when your node is being told to do so), and furthermore, you want your node also to be a good citizen and inform subsequent nodes that they need to clear their state (if any), for instance because your node's data format changed mid-flight, e.g., after a critical property setting in your node got changed by the user.

These things are usually very easy to handle with a few one-liners, but they are very important to prevent unexpected crashes of your (or other) nodes.

The most common case is that your node is being told that the format of the input data (the "signal") has changed. To handle this, you override the method on_signal_changed that's provided by the base class. In response, you need to clear any state that you may have, usually using a method that you've written for that purpose.

# MovingAverage.py 

def on_signal_changed(self):
    """Callback to reset internal state when an input wire has been
    changed."""
    self._reset_states()

Another case is when one of your own properties is being changed in a way that requires you to clear your state (e.g., the buffer length or set of channels to buffer). You can either do this by overriding the setter of each and every port that might break your state when changed (perhaps a little tedious and also easy to overlook one), or you are implementing a catch-all handler that gets triggered when any of your ports has its value change (except ports whose setters you've overridden yourself, such as data). That handler is called on_port_assigned:

# MovingAverage.py

def on_port_assigned(self):
    """Callback to reset internal state when a value was assigned to a
    port (unless the port's setter has been overridden)."""
    self._reset_states()

Lastly, about being a good citizen: if your node's configuration changes in a way that causes it to output data that is formatted (or e.g., scaled) differently than before that settings change, you need to emit a signal-changed notification. This is as simple as calling the following method (which you've already seen in the above example where the @axis.setter was overridden):

    self.signal_changed(True)

Other important features

If you're writing a node that is not just a simple signal processing node, chances are that you'll need to declare a few other interactions with your environment. For this, it helps to get familiar with the methods of the Node base class, which cover nearly anything you possibly have to do in your custom node.

Some examples of features that may be relevant when you're writing a not-so-simple processing node:

  • Releasing operating system resources: if your node holds OS resources, such as file handles, network sockets, GUI windows, and the like, your node should override the on_release() function and release these resources in return. This gets called, for instance, when the graph of nodes is unloaded or reloaded, or when your node is removed from the graph.
  • Declaring when your node is finished processing: if you are writing a data source node (e.g., file import, directory traversal, network reader, device input), you almost certainly need to override the is_finished() method, which shall return False if your node will continue to return data on future calls to getters (e.g., a node listening on the microphone is never done returning data), and True otherwise (done traversing the directory, no more files to emit). This is needed when someone builds a pipeline that acts like a processing script, and terminates itself when it is done processing.
  • Exposing trainable state: if your node has state that is adapted based on significant amounts of data (as in machine learning and some signal processing), you can declare that state, and allow the user to save/load snapshots of their graph's trainable state, including that of your node as models that can later be applied out of the box to new data. For this, you override get_model() and set_model().
  • Declaring whether your node is currently calibrating: this is mostly a bells & whistles type feature, but sometimes it can be hard for a user to know why their pipeline is not yet returning any data. When your node declares that it's still buffering calibration data, this makes working with your node in a pipeline that easier. For this, you simply override the is_calibrating() method and return True or False.

You can also find example nodes with tutorial-style documentation that highlight how to use some of these features in the Examples/ folder that comes with your NeuroPype distribution.

Node Description / Documentation

Properly documenting a node is essential to enabling users to use it correctly. The NeuroPype convention is to document a node in the source code of the node itself.

The description() method defines the node’s name and explains the node’s function and how to use it. This information can be retrieved through the API and is shown in the Pipeline Designer GUI when selecting the node. It also defines the node version number and optionally license and url for more documentation.

@classmethod
def description(cls):
    """Declare descriptive information about the node."""
    return Description(name='Re-referencing',
                       description="""\
                       Subtract the average of some reference data. The two
                       most common use cases of this are a) re-referencing
                       of EEG/EMG, where one or more channels are averaged,
                       and the result is subtracted from all channels, and
                       b) baseline removal, where the average in some
                       baseline time window is taken, and subtracted from
                       the whole time period.

                       The range can be given either as a list of values
                       (e.g., indices or channel names in single quotes),
                       formatted as [value1, value2, ...], or as a range
                       expression of the form start:stop, where all
                       successive indices beginning with start and up to
                       one prior to stop are included. This node also
                       supports robust averaging using the median,
                       which can be useful for robust re-referencing,
                       where one or more channels are particularly
                       noisy.""", version='1.0.0')

The description docstring should follow a particular convention for best results with external tools: the first sentence should be the short executive summary of what the node does (this may be the same as the one-liner docstring of your class). The paragraph following that is the essential description of your node node. Any subsequent paragraphs comprise extended description that may or may not be shown in some abbreviated documentation views.

As for documenting a node's properties, the verbose_name argument allows you to give the port a human-readable name, while the description argument allows you to add a longer description of how the property is used or what it should contain. This information can be retrieved through the API, and in the case of the Pipeline Designer, the description argument is displayed when hovering over the value's input box in the node properties window.

Creating a new category/package of nodes

To create a new category of nodes, you you create a new Python package directory in the nodes/ folder of NeuroPype (e.g., image_processing). You need to add a __init__.py file, which should be structured like the following example:

"""Image processing algorithms.

This example package declares a number of image processing algorithms.

Copyright statement on last line.
"""

from .._shared import import_nodes
__all__ = import_nodes(__file__, __name__, __package__)

You will need both the docstring, which is recognized by external tools such as the Pipeline Designer, in which you should have an executive summary on the first line, as well as optionally additional documentation text thereafter. The last line of the docstring is reserved for Author/Copyright information and is stripped off by tools.

You also need to copy/paste the two code lines verbatim to have your package behave the same way as the other packages, namely to have it auto-import any .py files in the folder as the nodes in the package.

Note: currently, nothing stops you from declaring all your nodes right in the __init__ file, but this can get very confusing very quickly, and in the future, tools may reject custom package formats like that.

Importing third party libraries

If your node requires a third party library, you'll need to make sure that it's available in the same python environment used by your Neuropype installation (located in your Neuropype install folder, under Neuropype\python).

There are two ways you can do this:

If you use conda to manage your python environments, you can activate the python environment that ships with Neuropype like this:

conda activate c:\path\to\neuropype\installation\python

then use conda or pip to install the third party library as usual.

Or, instead of activating the conda environment, you can install the third party library with pip, by invoking the python executable that ships with Neuropype, like this:

cd c:\path\to\neuropype\installation\python
python -m pip install library_name

Copyright 2018-2019 Syntrogi Inc. dba Intheon. All Rights Reserved.