Creating a new Source Node in C# (e.g. adding a new camera driver to Bonsai)

952 views
Skip to first unread message

Adam Kampff

unread,
May 27, 2015, 5:07:14 PM5/27/15
to bonsai...@googlegroups.com
Hi Bonsaiers,
   I'd like to start a thread about creating new "Source" nodes. There is an excellent tutorial for creating "Transforms" on the Wiki (https://bitbucket.org/horizongir/bonsai/wiki/ImageProcessingTransformTutorial), but Sources appear to require a bit more Rx knowledge. This will be particularly useful for adding new hardware drivers to Bonsai (e.g. cameras).

  To help get started, it would be awesome if Goncalo could post an "annotated" example for the source code of the "CameraCapture.cs" node, which we all know and love. There is a lot going on in this short file, and I'd love a brief "walkthrough" by the Master.

  In return, I am working on a driver for NI-IMAQ CameraLink framegrabbers, and I'll post my progress here. Hopefully, we can all then "kick-start" the extension of Bonsai to new devices. Kinect would be cool...IoT stuff as well!

Cheers,
Adam 


Adam Kampff

unread,
May 29, 2015, 9:12:22 AM5/29/15
to bonsai...@googlegroups.com
Some progress...

Attached is an NI-IMAQ Source node that functions. Also, a screenshot of it running smoothly at 500Hz (Cameralink framegrabber (PCIe-1433) and Mikrotron EOSens)..and a gratuitous video of very slow fish.

There is much to improve (and understand) about what is going on...but a start nonetheless.

Cheers,
Adam

- - -
"The Code"

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using OpenCV.Net;
using System.Threading;
using System.Reactive.Linq;
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using Bonsai.Vision;
using NationalInstruments.Vision;
using NationalInstruments.Vision.Acquisition.Imaq;

namespace Bonsai.Pedro
{
    [Description("Garbs a continuous video sequence of images acquired from the specified IMAQ camera name.")]
    public class IMAQ_Camera : Source<IplImage>
    {
        // Class Variables
        IObservable<IplImage> source;
        readonly object captureLock = new object();

        // Constructor: Creates an IObservable source stream in a seperate thread (async Task)
        public IMAQ_Camera()
        {
            // Set Default Values
            cameraName = "img0";
            framweWidth = 1280;
            frameHeight = 1024;

            // The "source" is an IObservable stream if Images...created here
            source = Observable.Create<IplImage>((observer, cancellationToken) =>
            {
                return Task.Factory.StartNew(() =>
                {
                    // This protects from asynchronous collisions (i.e. locks the block of code to prevent access by another thread)
                    lock (captureLock)
                    {

                        // The "using" statement just creates a local scope for the "session" parameter, which is an NI-IMAQ capture handle
                        using (var _session = new ImaqSession(cameraName))
                        {
                            try
                            {
                                // Create pointless NI Datatypes
                                byte[,] data_2D = new byte[frameHeight, framweWidth]; // Rows by Columns
                                byte[] data_1D = new byte[framweWidth * frameHeight];
                                NationalInstruments.Vision.PixelValue2D valArray = new NationalInstruments.Vision.PixelValue2D(data_2D);

                                //  Start a Grab acquisition.
                                _session.GrabSetup(true);

                                // Create IPL Image
                                var frameSize = new Size(framweWidth, frameHeight);
                                var frame = new IplImage(frameSize, IplDepth.U8, 1);

                                // Loop until something cancels
                                while (!cancellationToken.IsCancellationRequested)
                                {

                                    //  Get the latest image in NI Format
                                    _session.Grab(valArray, true);

                                    // Copy to an intermediate 1-D representation (waste of time!)
                                    Buffer.BlockCopy(data_2D, 0, data_1D, 0, framweWidth * frameHeight);

                                    // Copy to IplImage
                                    Marshal.Copy(data_1D, 0, frame.ImageData, framweWidth * frameHeight);

                                    // If nothing comes back, then say the (IObservable) stream is "completed" and stop.
                                    if (valArray == null)
                                    {
                                        observer.OnCompleted();
                                        break;
                                    }
                                    else
                                    {
                                        //  If there is a frame, then do shit to the frame tand see what happens
                                        observer.OnNext(frame.Clone());
                                    }
                                }
                            }
                            // At the end, close the stream.
                            finally 
                            {
                                // Close the session.
                                _session.Close();
                            }
                        }
                    }
                },
                cancellationToken,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default);
            })
            .PublishReconnectable()
            .RefCount();
        }

        // Methods for accesing properties in the Property window
        [Description("The camera's IMAQ name.")]
        public string cameraName { get; set; }
        [Description("The frame width.")]
        public int framweWidth { get; set; }
        [Description("The frame height.")]
        public int frameHeight { get; set; }


        // ?
        public override IObservable<IplImage> Generate()
        {
            return source;
IMAQ_Camera.cs
IMAQ_Bonsai_Screenshot.png
IMAQ_FishTest_500Hz_clip.avi

goncaloclopes

unread,
Jun 27, 2015, 2:51:03 PM6/27/15
to bonsai...@googlegroups.com, adam....@gmail.com
Wow, this is beautiful! The IMAQ source looks great. We should probably package it into NuGet to make this accessible online, I will look into that.

About the CameraCapture code, yes, there's a lot going on. Essentially the majority of the code is about making execution robust to all kinds of abuse the user can come up with (robustness is always a tricky problem ;-)).

Anyway, here's my commented version of CameraCapture.cs, for future reference:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using OpenCV.Net;
using System.Threading;
using System.Reactive.Linq;
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Threading.Tasks;

namespace Bonsai.Vision
{
    // Attribute metadata. Description attributes specify strings that will be included as
   
// documentation in the editor

   
[Description("Produces a video sequence of images acquired from the specified camera index.")]
   
public class CameraCapture : Source<IplImage>
   
{
        // Only one exclusive connection can be made to a hardware camera object. However, many
       
// observers may want to connect to the camera. Because of this, the strategy for the
       
// CameraCapture node is to create and share the single connection between all observers.
       
// When the first observer subscribes to the source, the connection is created. When the
       
// last observer unsubscribes, the connection is destroyed.
       
// More details below.

       
IObservable<IplImage> source;

        // This lock is only necessary because of asynchronous creation and destruction of the
       
// camera connection. In case we are rapidly creating and destroying camera connections
       
// we want to wait until the last connection is destroyed before we create a new one.
       
// This lock ensures that.

       
readonly object captureLock = new object();


       
// Properties of the camera. This is mostly OpenCV details.
       
readonly CapturePropertyCollection captureProperties = new CapturePropertyCollection();

       
public CameraCapture()
       
{
            // Here we define our observable source. An observable is just a (possibly asynchronous)
           
// sequence of events that gets propagated to an arbitrary number of downstream observers.
           
// You can think about it as an iterator/generator/list/collection (pick your poison)
           
// with the difference that instead of consumers having to pull out items from it, they
           
// get pushed as notifications to each observer.
           
//
           
// One important point is that observable sequences only actually need to do something
           
// when an observer subscribes to it.
           
//
           
// The Observable.Create method makes it easy to create one of these sequences, by simply
           
// specifying a function that gets called for each observer. Inside this function, you can
           
// send notifications to the observer by calling observer.OnNext(). Also, you need to specify
           
// what happens if the observer cancels the subscription at any time (e.g. the workflow is
           
// stopped). There are many possible overloads to Observable.Create, but in this case, we
           
// are defining the sequence in terms of a Task that starts and stops tied to a particular
           
// observer subscription. The cancellationToken variable allows us to be notified when the
           
// observer cancelled the subscription.

            source
= Observable.Create<IplImage>((observer, cancellationToken) =>
           
{
                // Here we simply start the task that will emit the notifications to the observer
               
return Task.Factory.StartNew(() =>
               
{
                    // We wait until any previous connections are completely disposed.
                   
lock (captureLock)
                   
{
                        // We create the camera connection
                       
using (var capture = Capture.CreateCameraCapture(Index))
                       
{
                            // Apply the settings
                           
foreach (var setting in captureProperties)
                           
{
                                capture
.SetProperty(setting.Property, setting.Value);
                           
}
                            captureProperties
.Capture = capture;
                           
try
                           
{
                                // Loop until the observer has cancelled the subscription
                               
while (!cancellationToken.IsCancellationRequested)
                               
{
                                    // Read one image
                                   
var image = captureProperties.Capture.QueryFrame();
                                   
if (image == null)
                                   
{
                                        // If the next image is null, the camera was somehow stopped,
                                       
// so we signal the observer that the sequence has ended.
                                       
// This mostly never happens, but just to be sure.

                                        observer
.OnCompleted();
                                       
break;
                                   
}
                                    // Otherwise, send a copy of the image to the observer. The reason we
                                   
// send a copy is that acquisition of the next frame will overwrite the
                                   
// original image; this is a problem if the observer cached the image
                                   
// somewhere for future use.

                                   
else observer.OnNext(image.Clone());
                               
}
                           
}
                            // Make sure we reset the capture property to null at the end
                           
finally { captureProperties.Capture = null; }
                       
}
                   
}
               
},
                // These next parameters specify the operation of the Task. We give it the token, so that
               
// the task is cancelled in case the observer unsubscribes. We also indicate the Task is long
               
// running so that the framework allocates a dedicated thread to it, rather than a worker thread
               
// from the thread pool. The last parameter just assigns it the default scheduler.

                cancellationToken
,
               
TaskCreationOptions.LongRunning,
               
TaskScheduler.Default);
           
})
            // The next two methods make this source a shared (hot) source. This ensures there is only one observer
           
// subscribed to the camera and all notifications are distributed among all other observers.
           
// RefCount ensures that the connection is only made when there are actually observers.

           
.PublishReconnectable()
           
.RefCount();
       
}

        // Properties for the property window
       
[Description("The index of the camera from which to acquire images.")]
       
public int Index { get; set; }

        // Properties for the property window
       
[Description("Specifies the set of capture properties assigned to the camera.")]
       
public CapturePropertyCollection CaptureProperties
       
{
           
get { return captureProperties; }
       
}

        // Since we have defined our observable source in the constructor (because we are sharing it), here we
       
// just need to return that source.

       
public override IObservable<IplImage> Generate()
       
{
           
return source;
       
}
   
}
}


Hope this helps. There's still lots going on, I know, but hopefully more things are clear. Writing sources I would say is the most challenging aspect of extending Bonsai, so I agree there should definitely be a tutorial...

In the meantime checking out other sources is a good idea, like CameraCapture, AudioCapture, FunctionGenerator and so on.

Also more detailed information about observables and Reactive (Rx) in general at:

Cheers!

Eirinn Mackay

unread,
May 9, 2016, 7:45:28 AM5/9/16
to Bonsai Users, adam....@gmail.com
Hi Goncalo and Adam,
Did you ever get around to packaging this for NuGet? I would like to use this code for my 1000FPS Mikrotron camera!
Cheers,
Eirinn
Reply all
Reply to author
Forward
0 new messages