The NeuroPype API

NeuroPype has a full-featured REST API that provides all the functionality needed to create, modify, configure and run pipelines. This makes it easy to create user-facing applications that communicate with NeuroPype, or add signal processing capability to existing applications. The Pipeline Designer and the NeuroPype Control Panel applications distributed with the NeuroPype Suite are examples of such applications.

Functionality available through the API:

  • Establish a connection to the NeuroPype running instance
  • Query all loaded pipelines and their status
  • Run / pause / resume / stop / restart a pipeline
  • Terminate a pipeline (delete the running process to free up resources)
  • View / modify pipeline node parameters (before or while the pipeline is running)

The following advanced functions are also available over the API, providing you full control over the creation and editing of pipelines. These are documented in the Developer Reference.

  • Save changes to a pipeline to disk
  • Reload a pipeline from disk
  • Query a pipeline's graph (nodes & edges)
  • Add / remove nodes (functions) in a pipeline
  • Add / remove edges (connections between nodes) in a pipeline
  • Query all available packages (groups) of nodes
  • Query all available node classes
  • Query details about a node class
  • Query a node's ports (shares data with other nodes)

Guide notes:

  • In the examples in this guide we use Python syntax (using the requests library) to communicate with NeuroPype. However, any programming language that supports HTTP queries can be used.

  • Since API requests return HTML responses, the responses need to be converted to JSON objects to work with them, such as by adding .json() to the end of the requests command in Python. For the sake of readability, we omit this from all the examples in this guide.

The NeuroPype engine instance

First, you'll need to know the port on which the NeuroPype engine is running, as well as the host of course. If running on the same machine, the host will be 127.0.0.1 (localhost). By default NeuroPype runs on port 6937, though these can be changed when launching Neuropype (using --host and --port arguments). Check the NeuroPype logs when starting NeuroPype to see which port NeuroPype started on. For this documentation, we'll assume 127.0.0.1 and port 6937.

http://127.0.0.1:6937

NeuroPype 'executions'

NeuroPype creates separate execution contexts, called 'executions'. An 'execution' is a separate process into which a graph or pipeline can be loaded (from data or file), then modified by loading additional parameters, or models, and its state modified to run, pause, etc. The same NeuroPype instance can have multiple executions. This allows multiple clients, or multiple instances of a client, to run or interact with different pipelines (or different instances of the same pipeline) simultaneously. Each execution only holds one pipeline.

List of executions

We can fetch the list of currently running executions with a GET request to our base URL http://host:port/executions. Example:

requests.get('http://127.0.0.1:6937/executions')

If NeuroPype was just launched, this will return an empty array since no executions are created on launch. So our first step is to create an execution.

If executions are already running, this will return a list of their ids. For example:

[{'id': 2420}, {'id': 2421}]

Creating a new NeuroPype execution

We create an execution by sending a POST request to http://host:port/executions and passing a JSON string with the execution's properties. This string can be empty if we want to launch an execution with the default properties (typically the case; see the Execution Info section below for a list of the execution properties and their defaults).

requests.post('http://127.0.0.1:6937/executions', json={})

The execution id

The previous command will return a response that includes the id of the execution (a 4-digit integer), assigned automatically by NeuroPype. We'll want to store that id as it will be used for all further interactions with the execution through the API. For example:

execution_id = response.json()['id']

Now that we have the id of the execution, for the sake of simplicity in the examples in this guide, we'll create a base URL that we will use for all of our interactions with this NeuroPype instance:

URL = 'http://127.0.0.1:6937/executions/3288'

Deleting an execution

We can delete an execution, which will terminate the process, with a DELETE request to /executions/execution_id. Example:

requests.delete('http://127.0.0.1:6937/executions/3288')

Deleting an execution will immediately terminate any pipeline running in that execution, and the pipeline will no longer be accessible. Deleting an execution exits the process in which the pipeline is running and frees up all its resources. If you're done with a pipeline, it's a good idea to delete the execution to free up resources, but if you do so, if you want to run the pipeline again, you'll need to create a new execution and reload the pipeline.

Execution info (properties)

We can query an execution for its properties by sending a GET request to /executions/execution_id/info:

requests.get('http://127.0.0.1:6937/executions/3288/info')

This will return a JSON object with the execution properties. The defaults are:

{'log_level': 20, 'error_mode': 'ignore', 'tickrate': 25.0}
  • log_level: The log level of the execution, using the standard Python logging level codes.
  • error_mode: Determines how NeuroPype engine handles errors in nodes. Most common options are:
    • 'ignore': errors are ignored and execution of the pipeline continues, while printing an exception traceback (default)
    • 'debug': errors are thrown and no cleanup action is taken before debugger stops at exception breakpoint
    • 'stop': execution of the pipeline is halted
  • tickrate: Tick rate when doing real-time processing

To change the info properties, such as to change the logging level or error handling mode, pass these properties with the POST request when creating the execution. These cannot be changed after the execution is created. For example, to set the tick rate and logging level, create the execution like this:

requests.post('http://127.0.0.1:6937/executions', json={'info': {'log_level': 10, 'tickrate': 25.0}})

Neuropype version, install folder

The easiest way to check whether Neuropype is running is to send a GET request to /version:

requests.get('http://127.0.0.1:6937/version')

If successful, this will return a 200 response and the following JSON object: version (NP version number), edition (the NP edition, i.e., Academic), install (the NP installation folder).

Pipelines

Load a pipeline

Now that we have the execution id (3288 in our example), we can load and interact with a pipeline.

As explained above about Executions, in all the examples that follow we use 'URL' to reference the base URL for our current execution (host:port/executions/execution_id), which looks like this:

URL = 'http://127.0.0.1:6937/executions/3288'

When referring to API requests, we'll just list the part that comes after the base URL. So a reference to /actions/load means http://host:port/executions/execution_id/actions/load, or http://127.0.0.1:6937/executions/3288/actions/load in our example.

To load a pipeline, send a POST request to /actions/load, and pass the parameters as a JSON object containing the filename of the pipeline, and telling it to load the graph object, which is the collection of all of the nodes and edges in the pipeline.

requests.post(URL + '/actions/load', json={'file': '/my/path/mypipeline.pyp'})

If successful, this will return a 200 response with an empty string.

Setting pipeline parameters

You can easily modify one or more pipeline parameters that are exposed through ParameterPort nodes in the pipeline, using the /actions/configure API call (available as of version 2024.1.0). The keys in the json payload must match the names of ParameterPort nodes in the pipeline.

requests.post(URL + '/actions/configure', json={'highpass_filter': [0.5, 1], 'subject_id': 'P1'})

For Neuropype versions prior to 2024.1.0 use the following method instead: after loading the pipeline, send an additional POST request to /actions/load, and pass the parameters to the json argument with an entry 'data' having a dictionary with its keys being the parameters to modify and 'what' is 'parameters'.

requests.post(URL + '/actions/load', json={'what': 'parameters',
                                           'data': {'highpass_filter': {'value': [0.5, 1]}, 'subject_id': {'value': 'P1'}})

Note that the parameter value has to be nested in a dictionary with key 'value'. This method is still supported in versions 2024.1.0 and onward.

With either method, if successful, the call will return a 200 response with an empty string.

For an "offline" pipeline (a pipeline that processes one or more recorded datasets rather than streaming data), you will need to do this before running the pipeline (see below).

For an "online" or "streaming" pipeline (processing data in real-time), you can modify the parameters at any time by sending the above POST request while the pipeline is running, and the changes will take effect immediately. This allows you to control the flow of a pipeline from an external client in realtime (i.e., show a different set of frequencies, execute an optional step, etc.)

Fetch the pipeline status

Since paused: True by default, the pipeline will be loaded but not executed.

We can check the status of the pipeline by sending a GET request to /state:

requests.get(URL + '/state')

This should return a JSON object along these lines:

{'paused': True, 'running': False, 'completed': False, 'calibrating': False, 'status': 'pending'}
  • running: whether the pipeline is loaded (default: False)
  • paused: whether the pipeline is being executed (default: True)
  • completed: whether the pipeline has finished running (default: True)
  • calibrating: whether the pipeline is calibrating the data (default: False); if true, running is also True
  • needs_keepalive: set to True if you want the execution to automatically terminate unless it is pinged every 5 seconds (default: False)
  • had_errors: the pipeline executed but encountered an error (default: False)
  • status: a string indicating the current status of the execution; possible values are:
  • pending: has not yet started (or reset and not yet restarted)
  • calibrating: running, but collecting/processing calibration data -running: processing data (possibly with intermittent errors if error_mode was set to 'ignore'; to determine that, check had_errors)
  • paused: the execution was paused
  • stopped: the execution was stopped and has not yet completed (it can be restarted)
  • succeeded: completed (no more data to process) without errors
  • failed: failed due to errors (some data may have been processed)

Run the pipeline

As of NeuroPype 2024.1.0, you can begin execution of the pipeline with a POST request to /actions/play:

requests.post(URL + '/actions/play')

If successful, this will return a 200 response and an empty string.

In earlier versions, you sent a PATCH request to /state and pass a JSON parameter that sets the paused property to False and the running property to True.

requests.patch(URL + '/state', json={'running': True, 'paused': False})

This will return a JSON object with the modified state:

{'needs_keepalive': False, 'paused': False, 'running': True, 'completed': False, 'calibrating': False}

Pause / resume / stop / restart the pipeline

As of NeuroPype 2024.1.0, you can pause, resume, or stop execution of the pipeline with the appropriate following POST request:

requests.post(URL + '/actions/pause')
requests.post(URL + '/actions/resume')
requests.post(URL + '/actions/stop')

In earlier versions, instead send a PATCH request to /state and pass paused: True (pause), paused: False (resume) or running: False (stop):

requests.patch(URL + '/state', json={'paused': True})
requests.patch(URL + '/state', json={'paused': False})
requests.patch(URL + '/state', json={'running': False})

If you have stopped the pipeline, you can restart it with:

requests.post(URL + '/actions/play')

or, for NeuroPype 2024.0.0 and older,

requests.patch(URL + '/state', json={'running': True})

This restarts execution of the pipeline from the beginning, rather than resume execution (as when paused).

Terminate the pipeline

For "offline" pipelines (processing one or more recorded datasets), the pipeline should terminate once it has finished processing all the imported data.

For "online" pipelines (processing data in real-time), the pipeline will run indefinitely until it is terminated. To terminate an online pipeline, you can stop it with a POST request to /actions/stop as described above.

In either case, if you're done with the pipeline, you can delete the execution to free up system resources (see above section on Executions). This is more important with "offline" pipelines, which may hold an entire dataset in RAM.

You can also use the following state flags to determine whether a pipeline will terminate automatically under certain conditions:

  • needs_keepalive: when set to True, the pipeline will terminate unless it is called via the API every n seconds (set to 5); this can be used in cases where you want to control the life of the pipeline from another client. You can "ping" the pipeline with the /logs or /errors GET calls to fetch the logs or latest errors, as well as by using the /actions/keepalive POST call (which simply returns a 200). This is useful for streaming (realtime or "online") pipelines, which will otherwise run indefinitely until terminated (either through the API or by setting this needs_keepalive flag).

You can set this flag with a call to /state. This can be the same call to run the pipeline. Example:

requests.patch(URL + '/state', json={'running': True, 'paused': False, 'needs_keepalive': True})

This will return a JSON object with the modified state:

{'needs_keepalive': True, 'paused': False, 'running': True, 'completed': False, 'calibrating': False}

Note that besides terminating the pipeline, unless you plan to restart it, you will want to delete the execution as well.

Sample script to launch multiple Neuropype pipelines

"""
Sample script to start/stop a collection of pipelines over the Neuropype API
"""

import requests
import os
import glob
import argparse
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# default settings
nphost = 'http://localhost'
npport = '6937'
# path to folder where pipelines (.pyp) are located
rootpath = ''
# list of pipelines to run (basename including extension); otherwise all .pyp files in the rootpath are run
pipelines = []

# setup
npurl = nphost + ':' + npport
execurl = npurl + '/executions'
rootpath = os.path.abspath(os.path.expanduser(rootpath))

def errorcheck(resp):
    if resp.status_code not in [200, 201]:
        logger.error(f'Could not reach Neuropype! Please ensure it is running. Error: {resp.content.decode("utf-8")}')
        exit()

def startpipes(pipes):
    logger.info(f'Found {len(pipes)} pipelines ...')
    for pipe in pipes:
        pipepath = os.path.join(rootpath, pipe)
        logger.info(f'Lauching pipeline: {pipepath}')
        # create new execution
        resp = requests.post(execurl, json={})
        errorcheck(resp)
        execid = resp.json()['id']
        pipeurl = f'{execurl}/{execid}'
        resp = requests.post(pipeurl + '/actions/load', json={'file': pipepath})
        errorcheck(resp)
        resp = requests.post(pipeurl + '/actions/play')
        errorcheck(resp)
    logger.info(f'{len(pipes)} pipelines launched.')

def stoppipes():
    resp = requests.get(execurl)
    errorcheck(resp)
    execs = resp.json()
    logger.info(f'{len(execs)} running pipelines found. Shutting them down ...')
    counter = 0
    for exec in execs:
        execid = exec['id']
        resp = requests.delete(f'{execurl}/{execid}')
        errorcheck(resp)
        counter += 1
    logger.info(f'{counter} pipelines terminated.')

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--start', action='store_true', help='Start all pipelines.')
    parser.add_argument('--stop', action='store_true', help='Stop all pipelines.')
    parser.add_argument('--path', default='', help=f'Specify path to folder containing pipelines to launch if not {rootpath}')
    arg = parser.parse_args()

    if arg.path:
        rootpath = os.path.normpath(arg.path)

    # get pipelines to run
    pipes = pipelines or glob.glob(os.path.join(rootpath, '*.pyp'))

    if not pipes:
        logger.error(f'No pipelines found in {rootpath}. Exiting.')
        exit()

    if arg.start:
        startpipes(pipes)
    elif arg.stop:
        stoppipes()
    else:
        logger.error('Please run as either "pipelines --start" or "pipelines --stop".')

Sample script to start / stop / set a Neuropype pipeline, fetch logs, list and delete running Pipeline executions.

This more complete script is found in the bin/exec.py in your Neuropype installation, and can be easily used to start, stop, list, and delete pipeline executions. It also serves as an example of how to perform such functions from any remote client.

"""
Execute or control a NeuroPype pipeline and get its state and logs.
Set parameters on a new or running pipeline.
List all running pipelines and optionally delete them.
"""

import os
import sys
import signal
import time
from logging import getLevelName
import requests
import argparse

sys.path.append(os.path.dirname(os.path.dirname(__file__)))


def signal_handler(sig, frame):
    print('\nCtrl+C pressed. Exiting.')
    sys.exit(0)


def fetch_logs(url):
    """fetch logs continuously unless pipeline is done or stopped"""
    top = 0
    is_running, is_done = check_state(url)
    while is_running and not is_done:
        logs = requests.get('/'.join([url, 'logs']), params={'from_idx': top}).json()
        for rec in logs:
            print('%s: %s' % (getLevelName(rec['level']), rec['message']))
            top = max(top, rec['id'] + 1)
        time.sleep(0.5)
        is_running, is_done = check_state(url)

def check_state(url):
    """get pipeline state and return if it's running and/or completed"""
    state = requests.get(url+'/state').json()
    return state['running'], state['completed']


def pipe_info(npurl, exid):
    """print pipeline info: name and status"""
    state = requests.get(f'{npurl}/{str(exid)}/state').json()
    desc = requests.get(f'{npurl}/{str(exid)}/graph/description').json()
    print(f"ID: {exid}, Pipeline: {desc}, Status: {state['status']}")


def terminate(pipeurl):
    """delete an execution and exit non-gracefully"""
    exid = pipeurl.split('/')[-1]
    requests.delete(f'{pipeurl}/{exid}')
    print(f'Execution {exid} deleted.')
    exit(1)

def neuropype_available(npurl):
    try:
        requests.get(npurl)
    except IOError:
        print(f"Cannot connect to NeuroPype at {npurl}. Is it running?")
        exit(1)

def set_params(pipeurl, params, token=None):
    """set params on a loaded or running pipeline"""
    # make sure params are valid
    try:
        params = eval(params)
    except Exception as e:
        print('Error parsing parameters: %s' % e)
        terminate(pipeurl)
    resp = requests.post(pipeurl + '/actions/configure', json=params)
    if resp.status_code != 200:
        print(f'Error setting parameters: {resp.text}')
        terminate(pipeurl)
    print(f'Pipeline parameters set: {params}')


if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    # parse args
    parser = argparse.ArgumentParser(description='Execute a NeuroPype pipeline. (The Neuropype '
                                                 'engine must already be running.)')
    parser.add_argument('--file', '-f', help='Full path of pipeline (.pyp) to run.')
    parser.add_argument('--play', '-p', action='store_true', help='Play a loaded pipeline or resume a previously paused pipeline. '
                                                            'You must either specify pipeline execution ID with --exid, or '
                                                            'specify a pipeline file with --file.')
    parser.add_argument('--pause', action='store_true', help='Pause a running pipeline. You must specify the pipeline '
                                                             'execution ID with --exid. This will not delete the execution. '
                                                              'You can resume the pipeline later with --play.')
    parser.add_argument('--stop', action='store_true', help='Stop a running pipeline. You must '
                                                            'specify the pipeline execution ID with --exid. This will '
                                                            'delete the execution.')
    parser.add_argument('--exid', '-e', help='The execution ID of a running pipeline. This execution ID is printed to console '
                                             'when first launching a pipeline. The special string "all" can be used to '
                                             'play, pause, or stop all running executions.')
    parser.add_argument('--state', '-s', action='store_true', help='Get the state of a running pipeline. Must be used with --exid.')
    parser.add_argument('--config', '--params', '-c', help="""A JSON string (or dictionary) of parameters to pass to the pipeline, in the following format:
                                               "{'param_name': 'value', 'param2_name': 'value', ...}". The keys must match the names of '
                                               'ParameterPorts in the pipeline. You must surround the JSON with double quotes. Parameters
                                               can be set on a new pipeline when --file is specified, or can be set on a running pipeline
                                               if --exid is specified.""")
    parser.add_argument('--list', '-l', action='store_true', help='List all running executions.')
    parser.add_argument('--logs', action='store_true', help='Continuously fetch logs from a running pipeline. You must specify the pipeline '
                                                            'execution ID with --exid. Press ^C to stop logs.')
    parser.add_argument('--purge', action='store_true', help='Delete all executions.')
    parser.add_argument('--clean', action='store_true', help='Delete all empty executions.')
    parser.add_argument('--port', default='6937',
                        help='Connect to Neuropype at this port. This should not '
                             'need to be changed unless Neuropype is running '
                             'on a different port than its default.')
    parser.add_argument('--host', default='127.0.0.1',
                        help='IP of the host machine where Neuropype is running. '
                             'Use 127.0.0.1 (aka localhost) if Neuropype is '
                             'running on this same machine, or another IP '
                             'if Neuropype is running on another machine on the same LAN.')
    parser.add_argument('--tickrate', type=float, default=25.0,
                        help='Nominal tickrate of the pipeline, in Hz. This would only '
                             'need to be changed in special circumstances, such as you '
                             'need the pipeline to run at an artificially '
                             'slow rate. Note that a pipeline may in practice run '
                             'at a slower tick rate depending on available computational '
                             'resources and how computationally expensive the pipeline is.')
    parser.add_argument('--token', help='Token to use for authentication, if required to load an '
                                        'encrypted pipeline')
    a = parser.parse_args()

    # the base URL for the Neuropype API server
    npurl = f'http://{a.host}:{a.port}/executions'

    # check that neuropype is running
    neuropype_available(npurl)

    # create a new execution, load a pipeline, optionally configure it, and optionally run it
    if a.file:
        exc = requests.post(npurl, json={'info': {'tickrate': a.tickrate}}).json()
        exid = str(exc['id'])
        pipeurl = npurl + '/' + exid
        payload = {'file': a.file}
        if a.token:
            payload.update({'token': a.token})
        # load the pipeline
        resp = requests.post(pipeurl + '/actions/load', json=payload)
        if resp.status_code != 200:
            print(f'Error loading pipeline: {resp.text}')
            terminate(pipeurl)
        print(f'Pipeline loaded with execution ID {exc["id"]}')
        # optionally set parameters
        if a.config:
            set_params(pipeurl, a.config, a.token)
        # optionally run the pipeline (is otherwise paused when first loaded)
        if a.play:
            requests.post(pipeurl + '/actions/play')
            print('Pipeline started.')

    # control an already loaded pipeline
    if a.exid and (a.play or a.pause or a.stop or a.logs or a.config or a.state):
        if not a.exid:
            parser.error('--exid required with --play, --pause, or --stop')
            exit(1)
        # make sure execution exists
        pipeurl = npurl + '/' + str(a.exid)
        if requests.get(pipeurl).status_code != 200:
            print(f'Execution {a.exid} not found.')
            exit(1)
        # play, pause or stop cannot be combined
        if a.play:
            requests.post(pipeurl + '/actions/play')
            print('Pipeline started or resumed.')
            pipe_info(npurl, a.exid)
        elif a.pause:
            requests.post(pipeurl + '/actions/pause')
            print('Pipeline paused.')
            pipe_info(npurl, a.exid)
        elif a.stop:
            requests.post(pipeurl + '/actions/stop')
            requests.delete(pipeurl)
            print('Pipeline stopped and execution deleted.')
        # get state
        if a.state:
            pipe_info(npurl, a.exid)
        # set params, can be combined with --play
        if a.config:
            # set the params
            set_params(pipeurl, a.config)
        # fetch logs continuously
        if a.logs:
            print(f'Fetching logs for pipeline {a.exid}. ^C to exit at any time.')
            fetch_logs(pipeurl)
        exit(0)

    # list all executions and show their name and state
    if a.list:
        excs = requests.get(npurl).json()
        print('Running executions:')
        if not excs:
            print('None')
        for exc in excs:
            exid = exc['id']
            # fetch state
            pipe_info(npurl, exid)
        exit(0)

    # delete all executions
    if a.purge:
        excs = requests.get(npurl).json()
        for exc in excs:
            requests.delete(f"{npurl}/{exc['id']}")
            print(f'Deleted execution {exc["id"]}')
        exit(0)

    # delete all empty executions
    if a.clean:
        excs = requests.get(npurl).json()
        for exc in excs:
            if 'state' not in exc:
                requests.delete(f"{npurl}/{exc['id']}")
                print(f'Deleted empty execution {exc["id"]}')
        exit(0)

Copyright (c) Syntrogi Inc. dba Intheon. All Rights Reserved.