Main Site Documentation

Locking critical code in a single thread


#1

I have an application where I’m reading several serial ports, each port with their own port event handler. I’d like to log to my SD card every message that comes in on every serial port as well as some activity that is going on elsewhere in my program. I’m locking each call to the method in my logger class that writes to

ut I see some evidence that I'm not really locking the code.  I've only got one thread in my program but my understanding is that .Net MF executes the event handlers in another thread.

Here's the code.  The portDataReceivedHandler reads all the characters available at the port and stores them in a buffer.  When I see the end of line character I call the parseAndWrite method which actually calls the logger.write method inside a lock.  There are 2 other portDataReceivedHandler methods for other serial ports that work the same way and I need to make sure they aren't stepping on each other trying to write to SD.  Does this seem like a scheme that should work?  Comments and other ideas will be appreciated.

Thanks - Gene

```cs

       private static void portDataReceivedHandler(object sender, SerialDataReceivedEventArgs e)
        {
            portBytesToRead = SBE41Port.BytesToRead;

            try
            {
                SBE41Port.Read(portReadBytes, 0, portBytesToRead);

            }
            catch
            {
                Debug.Print("SBE41 Read Exception");          //TODO do something smarter here
            }

            for (int i = 0; i < portBytesToRead; i++)
            {
                if (portInBytesIndex == -1)    //Check to make sure first character is = LF
                {
                    if (portReadBytes[i] == 10)  //only move on to byte index 1 after we get a LF
                    {
                        portInBytesIndex = portInBytesIndex + 1;
                    }
                }
                else
                {
                    portInBytes[portInBytesIndex] = portReadBytes[i];
                    if (portInBytes[portInBytesIndex] == 13)
                    {
                        messageNum = messageNum + 1;
                        parseAndWriteLog(portInBytes);
                     
                        Array.Clear(portReadBytes, 0, portReadBytes.Length);
                        Array.Clear(portInBytes, 0, portInBytes.Length);
                        portInBytesIndex = -1;
                    }
                    else
                    {
                        portInBytesIndex = portInBytesIndex + 1;
                    }
                }
            }
        }

        public static void parseAndWriteLog(byte[] inArray)          
        {
            inArrayLength = Array.IndexOf(inArray, 0);

            parserI = Array.IndexOf(inArray, token);

            while ((inArray[parserI] != CR) && (inArray[parserI] != comma) && (parserI < inArrayLength))
            {
                pressureBytes[pressureBytesIndex] = inArray[parserI];
                pressureBytesIndex = pressureBytesIndex + 1;
                parserI = parserI + 1;
            }
            sb.Clear();
            sb.Append(UTF8Encoding.UTF8.GetChars(pressureBytes));
            pressure = double.Parse(sb.ToString());
            lock (SBE41Lock) { Logger.write(Logger.ColumnNums.comment, "SBE 41 Message", Logger.ColumnNums.pressure, pressure); }
            Array.Clear(pressureBytes, 0, pressureBytes.Length);
            pressureBytesIndex = 0;
        }


#2

Can you show other 2 port event handlers? I find the statement

 misleading. It feels like you are using multiple "parseAndWriteLog" functions, too. Otherwise you wouldn't name the lock object ("SBELock") after the specific serial port, right? :)

Now, if you are using a single parse function, you're in trouble, too, because you are locking the very writing to the SD card, however, variables [em]inside[/em] the parser function are still shared, as, judging from the code you provided, you declared "inArrayLength", "parserI", "pressure " and others somewhere else. So, for example, Port1 my get the lock, but Port2 might change pressure variable just before Port writes it to the SD card.

I suggest locking the whole parseAndWriteLog function, something like this:

```cs
 messageNum = messageNum + 1;
                        lock(_theLockObject)  parseAndWriteLog(portInBytes);
                     
                        Array.Clear(portReadBytes, 0, portReadBytes.Length);
                        Array.Clear(portInBytes, 0, portInBytes.Length);
                        portInBytesIndex = -1;

#3

As far as I know for NETMF and Gadgeteer, your assumption is not right. If you have only one thread all your Event handlers are executed in the same thread one not before the other has finished. So I think you need no locking.

I would not parse for CRLF in the eventhandler but would use a separate instance of a buffer class as shown in post:
https://www.ghielectronics.com/community/forum/topic?id=14821&page=2
Post #17
Also interesting to read James post #13 in the same thread (no locking is needed to do this :slight_smile: )

Cheers
Roland


#4

Thanks for the quick responses. I don’t have access to my code right now but I’ll post the other event handlers when I get back to work. However, I’ll try to describe what I’m doing in words. I have a separate class for every sensor attached to a serial port. Each class sets up it’s own serial port and has it’s own event handler, global variables like “inArrayLength”, “parserI”, "pressure ", object lock and parseAndWrite method. There is only one logger class with a write to SD method that gets called by each sensor class’ parse and write method.

By the way, I’m only using plain vanilla .Net, no Gadgeteer functions. I’d be interested in your comments on whether it’s easier to write robust applications in .Net vs Gadgeteer.

Thanks again.


#5

@ Gene - As far as I know all serial DataReceived events are executed one after another on the same thread. That is, if there are 4 serial ports, and each port is subscribed to its own DataReceived event, if one or more events are raised at the “same” time, only 1 event handler at a time will actually run. The other even handlers will wait until the current one has finished, each waiting its turn based on what time stamp their events were raised.


#6

This is correct. That is why, with serial ports, you must minimize the processing done in the event handler.


#7

OK, thanks to all your comments I think I’m back on track.

The previous version of this code didn’t “parse and write” in the serial port event handler. I have a byte array queue (thanks to @ taylorza). When the event handler got the CR, it dumped the message onto the queue and the queue processor took care of calling the right parser for each serial port message. I do lock the call to enqueue a message since it is possible in my program for a timer event handler, which I’m pretty sure does run it it’s own thread right?, to also enqueue a message.

After a message is parsed, the queue processor calls the SD write method and it is the only place where writing the SD card happens. I went down the “no queue” track after I’d gotten the parsers to be much, much simpler and neglected the fact that writing to SD is pretty slow. I’m back to using the queue and everything seems to be working normally, at least for now.

I do have a question for @ RoSchmi if you don’t mind. I’ve relearned the lesson that we’re supposed to do the bare minimum in serial port event handlers but I still look for the start of text character (LF) and end of text character (CR) in my event handler before passing the message to the queue. I looked hard at your code and can see that you pass every byte received at the event handler to your SerialBuffer without really looking at. Where do you process the SerialBuffer? I could do this but I’d have a SerialBuffer for every port which is do-able but kind of awkward. Do you think the relative small additional steps I do in the event handler will be an issue?

I’ve posted my code below if anyone is interested in the gory details. Any comments will still be greatly appreciated.

Thanks for all the help

Here is the SBE41 class that reads the data from the instrument. This gizmo is kind of old and requires some carefully timed hardware handshaking before it spits up data over it’s serial port. I do that with a timer and I think the timer event handler may have been part of my previous problem.


using System;
using Microsoft.SPOT;

using System.Text;
using System.Threading;
using System.IO.Ports;

using Microsoft.SPOT.Hardware;
using Microsoft.SPOT.IO;

using GHI.Hardware.EMX;

namespace miniCPF
{
    class SBE41
    {
        private static SerialPort SBE41Port = new SerialPort("com4", 9600, Parity.None, 8, StopBits.One);

        private static Timer SBE41Timer = null;

        private static OutputPort DR = new OutputPort(Pin.IO15, false);
        private static OutputPort SPM = new OutputPort(Pin.IO46, true);
        private static OutputPort serialTX;

        private static double pressure = Double.NaN;
        
        private static byte[] pressureBytes = new byte[16];
        private static byte[] portReadBytes = new byte[1024];
        private static byte[] portInBytes = new byte[240];
        private static byte[] header = UTF8Encoding.UTF8.GetBytes("CTD ");
        private static byte[] message = new byte[256];

        private static byte space = 32;         
        private static byte CR = 13;
        private static byte LF = 10;
        private static byte comma = 44;
        private static byte STX = LF;
        private static byte ETX = CR;
        private static byte token = space;

        private static int pressureBytesIndex = 0;
        private static int portBytesToRead;
        private static int portInBytesIndex = -1;
        private static int messageNum;
        private static int timerState = 0;
        private static int parserI = 0;
        private static int inArrayLength = 0;

        private static StringBuilder sbTemp = new StringBuilder(256);

        private static Object SBE41Lock = new Object();

        public static void Init()
        {
            SBE41Port.DataReceived += new SerialDataReceivedEventHandler(portDataReceivedHandler);

            //Initialize();
            SBE41Port.ReadTimeout = 5000;
            SBE41Port.WriteTimeout = 20;
            SBE41Port.Open();

            //ReleaseWakeUp();
            DR.Write(false);
            SPM.Write(true);

            SBE41Timer = new Timer(new TimerCallback(SBE41TimerCallback), null, 500, 2000);
            //Stop the timer by set SBE41Timer = null

            Array.Copy(header, message, header.Length);
        }

        private static void SBE41TimerCallback(object state)
        {
            switch (timerState)
            {
                case 0:
                    SBE41Port.Close();
                    serialTX = new OutputPort(Pin.IO6, true);
                    DR.Write(true);
                    SPM.Write(false);
                    // Debug.Print(DateTime.Now + "." + DateTime.Now.TimeOfDay.Milliseconds.ToString() + "  SBE41 State 0  DR = true   SM = false   TX = true");
                    timerState = 1;
                    SBE41Timer.Change(0, 50);
                    break;
                case 1:
                    DR.Write(false);
                    // Debug.Print(DateTime.Now + "." + DateTime.Now.TimeOfDay.Milliseconds.ToString() + "  SBE41 State 1  DR = false  SM = false   TX = true");
                    timerState = 2;
                    SBE41Timer.Change(0, 150);
                    break;
                case 2:
                    serialTX.Write(false);
                    // Debug.Print(DateTime.Now + "." + DateTime.Now.TimeOfDay.Milliseconds.ToString() + "  SBE41 State 2  DR = false  SM = false   TX = false");
                    timerState = 3;
                    SBE41Timer.Change(0, 150);
                    break;
                case 3:
                    SPM.Write(true);
                    serialTX.Write(true);
                    //Debug.Print(DateTime.Now + "." + DateTime.Now.TimeOfDay.Milliseconds.ToString() + "  SBE41 State 3  DR = false  SM = true    TX = true");
                    timerState = 4;
                    SBE41Timer.Change(0, 50);
                    break;
                case 4:
                    serialTX.Dispose();
                    SBE41Port.Open();
                    //Debug.Print(DateTime.Now + "." + DateTime.Now.TimeOfDay.Milliseconds.ToString() + "  SBE41 State 3  DR = false  SM = true    TX = true");
                    timerState = 0;
                    SBE41Timer.Change(0, 1600);
                    break;
            }
        }

        private static void portDataReceivedHandler(object sender, SerialDataReceivedEventArgs e)
        {
            portBytesToRead = SBE41Port.BytesToRead;

            try
            {
                SBE41Port.Read(portReadBytes, 0, portBytesToRead);

            }
            catch
            {
                Debug.Print("SBE41 Read Exception");          //TODO do something smarter here
            }

            for (int i = 0; i < portBytesToRead; i++)
            {
                if (portInBytesIndex == -1)    //Check to make sure first character is = LF
                {
                    if (portReadBytes[i] == STX)  //only move on to byte index 1 after we get the STX character
                    {
                        portInBytesIndex = portInBytesIndex + 1;
                    }
                }
                else
                {
                    portInBytes[portInBytesIndex] = portReadBytes[i];
                    if (portInBytes[portInBytesIndex] == ETX)       //process the message when we get the ETX character
                    {
                        messageNum = messageNum + 1;

                        Array.Copy(portInBytes, 1, message, header.Length, portInBytesIndex);
                        lock (SBE41Lock) { Program.MessageQueue.Enqueue(message); }
                     
                        Array.Clear(portReadBytes, 0, portReadBytes.Length);
                        Array.Clear(portInBytes, 0, portInBytes.Length);
                        portInBytesIndex = -1;
                    }
                    else
                    {
                        portInBytesIndex = portInBytesIndex + 1;
                    }
                }
            }
        }

        public static double parse(byte[] inArray)          
        {
            inArrayLength = Array.IndexOf(inArray, 0);

            parserI = Array.IndexOf(inArray, token);

            while ((inArray[parserI] != CR) && (inArray[parserI] != comma) && (parserI < inArrayLength))
            {
                pressureBytes[pressureBytesIndex] = inArray[parserI];
                pressureBytesIndex = pressureBytesIndex + 1;
                parserI = parserI + 1;
            }
            sbTemp.Clear();
            sbTemp.Append(UTF8Encoding.UTF8.GetChars(pressureBytes));
            pressure = double.Parse(sbTemp.ToString());
            Array.Clear(pressureBytes, 0, pressureBytes.Length);
            pressureBytesIndex = 0;
            return (pressure);
        }
    }
}

Here is my ByteQueue class

// Modified from @ taylorza on GHI Electronics Forum
// https://www.ghielectronics.com/community/forum/topic?id=8083&page=2#msg79627


using System;
using Microsoft.SPOT;

using System.Text;

namespace miniCPF
{
    public class ByteQueue
    {
        private static byte[][] buffer;
        private static byte[] returnArray;
        private static int head;
        private static int tail;
        private static int count;
        private static int capacity;
        private static int i;

        private const int defaultCapacity = 512;
        private const int defaultMessageLength = 256;

        public ByteQueue()
        {
            buffer = new byte[defaultCapacity][];
            for (i = 0; i < defaultCapacity; i++)
            {
                buffer[i] = new byte[defaultMessageLength];
            }

            returnArray = new byte[defaultMessageLength];

            capacity = defaultCapacity;
            head = 0;
            tail = 0;
            count = 0;
        }

        public int Count
        {
            get { return count; }
        }

        public void Clear()
        {
            count = 0;
            tail = head;
        }

        public void Enqueue(byte[] byteArray)
        {
            //GM I've taken out the ability to grow since if the queue does have to grow, something is wrong
            //if (count == capacity)
            //{
            //  Grow();
            //}
            //buffer[head] = value;
            if (count >= capacity)
            {
                throw new InvalidOperationException("Queue is full");
            }
            else
            {
                Array.Copy(byteArray, buffer[head], byteArray.Length);
                head = (head + 1) % capacity;
                count++;
            }
        }

        public byte[] Dequeue()
        {
            Array.Clear(returnArray, 0, returnArray.Length);
            if (count == 0)
            {
                throw new InvalidOperationException("Queue is empty");
            }
            else
            {
                //      byte value = buffer[tail];
                Array.Copy(buffer[tail], returnArray, buffer[tail].Length);

                tail = (tail + 1) % capacity;
                count--;
                return returnArray;
            }
        }

        //public int Dequeue(byte[] bytes)
        //{
        //  int maxBytes = System.Math.Min(count, bytes.Length);
        //  for (i = 0; i < maxBytes; i++)
        //  {
        //    bytes[i] = Dequeue();
        //  }
        //  return maxBytes;
        //}

        //private void Grow()
        //{
        //  int newCapacity = capacity << 1;
        //  byte[] newBuffer = new byte[newCapacity];

        //  if (tail < head)
        //  {
        //    Array.Copy(buffer, tail, newBuffer, 0, count);
        //  }
        //  else
        //  {
        //    Array.Copy(buffer, tail, newBuffer, 0, capacity - tail);
        //    Array.Copy(buffer, 0, newBuffer, capacity - tail, head);
        //  }

        //  buffer = newBuffer;
        //  head = count;
        //  tail = 0;
        //  capacity = newCapacity;      
        //}
    }
}

And here is a snippet from my Main that processes the queue

        private static void processMessageQueue()
        {
            do
            {
                Array.Clear(dequeueMessage, 0, dequeueMessage.Length);
                lock (programLock) { dequeueMessage = Program.MessageQueue.Dequeue(); }
                if ((dequeueMessage[0] == (byte)'C') && (dequeueMessage[1] == (byte)'T') && (dequeueMessage[2] == (byte)'D'))     //TODO Probably need a more robust way to do this
                {
                    pressure = SBE41.parse(dequeueMessage);
                    Logger.write(Logger.ColumnNums.comment, "SBE 41 Message", Logger.ColumnNums.pressure, pressure);
                }
                else
                {
                    sbTemp.Clear();
                    sbTemp.Append("Generic Queue Message");
                    sbTemp.Append(dequeueMessage);
                    Logger.write(Logger.ColumnNums.comment, sbTemp);
                }
            } while (Program.MessageQueue.Count > 0);
        }


#8

Hi Gene,
your task is too complex to give a valid answer in a minute but it’s very interesting.
Here is the link to the original post of the SerialBuffer Class.

https://www.ghielectronics.com/community/codeshare/entry/91

There may be better solutions but I find that it is a very smart little “black box”. You fill chunks of data in and get something back only if the string is complete.
Principally using this SerialBuffer class is not faster than looking for CRLF in the eventhandler itself. I think the serialBuffer class cannot grant that the first string is cought complete from its beginning.
I hope that I get some time in the next days to have a closer look on your code.

Cheers Roland


#9

Roland

If you have the time and interest to look at it that would be great. Regardless, your help has been invaluable already.

Thanks again - Gene


#10

I may be all wet, Gene, but if control of the log queue and the writing to the SD is done in a static, singleton with locking in place and no one else can queue or dequeue messages, then there shouldn’t be any way to stomp on your data.

But that might mean moving all the logic for updating the queue to a central spot, if it’s not already there.

I have trouble figuring out what is happening in code I wrote, let alone what someone else has done.


#11

The way I’ve structured the code, lots of methods can enque data. All the dequeing and subsequent writes to disk are done in one place. Some of these enqueing is done by timer event handlers which I’m pretty sure are handled in a separate thread that .Net MF sets up behind the scenes so all the enqueing and dequeing is done with locks. Hopefully this makes it a little more clear.


#12

OK, as I understand your code now does what you expect. Then my congratulations :clap:
Cheers
Roland