Celso Axelrud

Computer Science Experimentation

Wednesday, July 15, 2015

HIGH PERFORMANCE REAL-TIME STOCKS COMPUTATION ENGINE - part 1

HIGH PERFORMANCE REAL-TIME STOCKS COMPUTATION ENGINE - part 1

HIGH PERFORMANCE REAL-TIME STOCKS COMPUTATION ENGINE - part 1

This blog explores a High Performance Real-Time Stocks Computation Engine for data collection and processing. It can be the basis of a Real-time Trading System.

High Performance is obtained by using Reactive and Non-blocking code.

The code has the following properties:

As an overall pattern, the PUSH pattern is the preferred style, eventhough, the example, presented here, starts by, periodicly, pull data from the web.

The main reason is related to the fact that the communication with other processes/computers is expected to use Publish/Subscribe, and this is a PUSH pattern.

Example steps

Define references to the libraries.

For Reactive Extensions.

1: 
2: 
3: 
4: 
5: 
6: 
#r @"C:\Project(comp)\Dev_2015\Stocks_1\Appl1\lib\System.Reactive.Core.dll"
#r @"C:\Project(comp)\Dev_2015\Stocks_1\Appl1\lib\System.Reactive.Interfaces.dll"
#r @"C:\Project(comp)\Dev_2015\Stocks_1\Appl1\lib\System.Reactive.Linq.dll"
#r @"C:\Project(comp)\Dev_2015\Stocks_1\Appl1\lib\System.Reactive.PlatformServices.dll"
#r @"C:\Project(comp)\Dev_2015\Stocks_1\Appl1\lib\System.Reactive.Providers.dll"
#r @"C:\Project(comp)\Dev_2015\Stocks_1\Appl1\lib\Microsoft.Reactive.Testing.dll"

For FSharp.Control.Reactive.

1: 
#r @"C:\Project(comp)\Dev_2015\Stocks_1\Appl1\lib\FSharp.Control.Reactive.dll" //Need to be the last reference

Open the libraries.

1: 
2: 
3: 
4: 
open System
open System.Collections.Generic
open System.Threading
open System.Net

Open Reactive Extensions libraries.

1: 
2: 
3: 
4: 
5: 
6: 
open System.Reactive
open System.Reactive.Linq
open System.Reactive.Disposables
open System.Reactive.Concurrency
open System.Reactive.Subjects
open Microsoft.Reactive.Testing

Open FSharp.Control.Reactive libraries.

1: 
2: 
3: 
open FSharp.Control.Reactive
open FSharp.Control.Reactive.Builders
open FSharp.Control.Reactive.Observable

Define a Helper module.

It includes several observers to help display results.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
module Helper=
    let name=String.Empty
    let PrintThread (location:string)=
            printfn "Loc:%s Thread:%d" location Thread.CurrentThread.ManagedThreadId
    let f1=fun (i:'T)->PrintThread "OnNext"
                       printfn "%s-->%A" name i
    let f2=fun (ex:Exception)->printfn "%s-->%A" name ex.Message
    let f3=fun ()-> PrintThread "OnCompleted"
                    printfn "%s completed" name
    let Dump1<'T>  (name:string) (source:IObservable<'T>) =
        Observable.subscribeWithCallbacks f1 f2 f3 source              
    let Dump2<'T>  (name:string) (source:IObservable<'T>) =
        source |> (Observable.subscribeOn Scheduler.Default) 
               |>  Observable.subscribeWithCallbacks f1 f2 f3              
    let Dump3<'T>  (name:string) (source:IObservable<'T>) =
        source |> (Observable.subscribeOn Scheduler.Default)   
               |> (Observable.observeOn Scheduler.Default) 
               |>  Observable.subscribeWithCallbacks f1 f2 f3              

The following fields are being collected. They are combined to form the field f=nsl1opin http://download.finance.yahoo.com/d/quotes.csv?s=MSFT,GOOG&f=nsl1op&e=.csv URL:

  • name(n),symbol(s),latestValue(l1),open(o),closeValue(p)

Define the URL.

1: 
let url = "http://download.finance.yahoo.com/d/quotes.csv?s="

Define a Stock type to hold Stocks information.

1: 
type Stocks={Name:string;Symbol:string;LatestValue:float;OpenValue:float;CloseValue:float}

Define an Asynchonous Workflow to get Stock data.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
let getStockData stock fields =
    async {
        //try 
            let wc = new WebClient()
            Helper.PrintThread "Async before Download"
            let! data = wc.AsyncDownloadString(Uri(url + stock + "&f="+ fields + "&e=.csv"))
            Helper.PrintThread "Async after Download"
            //let data = wc.DownloadString(url + stock + "&f="+ fields + "&e=.csv")
            //printfn "%s" data
            let dataLines = data.Split([| '\n' |], StringSplitOptions.RemoveEmptyEntries) 
            let s=seq { for line in dataLines do
                            let infos = line.Split(',')
                            yield {Name=infos.[0]; Symbol=infos.[1];
                                    LatestValue=float infos.[2]; OpenValue=float infos.[3]; 
                                    CloseValue=float infos.[4]}
                       }
                |> Array.ofSeq 
            return s //Some s
        //with _ -> return None
      }

You can test getStockInfo using:

1: 
2: 
let r=(getStockData "GOOG,MSFT" "nsl1op") |>Async.RunSynchronously
r.[0],r.[1];;

Create Observables Sequences.

  • Create an "event" (observable) every 5 seconds (fast for testing pursose)

  • Take 3 elements

  • Get Stocks information asynchoronous

  • Split into individual Stocks
1: 
2: 
3: 
4: 
5: 
let t1=Observable.interval (TimeSpan.FromSeconds(5.0))
let o1=t1 |> Observable.take(3) 
let o2= Observable.SelectMany(o1,fun _-> Observable.ofAsync (getStockData "GOOG,MSFT" "nsl1op"))
let oStock1=o2 |> Observable.map (fun (i:Stocks[])->i.[0])
let oStock2=o2 |> Observable.map (fun (i:Stocks[])->i.[1])

Observe with different Observers

Comments:

Case 1:

  • Main thread (1) is not blocked.

  • Observable Sequence is blocking Thread 12.

  • Obsever (OnNext) is called in the same thread as the Observable Sequence

Case 2:

  • Obsever (OnNext) is called in the same thread as the Observable Sequence

Case 3:

  • No blocking of Threads.

Case 1

(using Dump1 observer)

1: 
2: 
3: 
Helper.PrintThread "REPL example1-1-begin"
let d1=Helper.Dump1 "Stock1" oStock1
Helper.PrintThread "REPL example1-1-end"

Use d1.Dispose() to stop it.

Case 2

(using Dump2 observer)

1: 
2: 
3: 
Helper.PrintThread "REPL example1-2-begin"
let d2=Helper.Dump2 "Stock2" oStock2
Helper.PrintThread "REPL example1-2-end"

Use d2.Dispose() to stop it.

Case 3

(using Dump3 observer)

1: 
2: 
3: 
Helper.PrintThread "REPL example1-3-begin"
let d3=Helper.Dump3 "Stock3" oStock2
Helper.PrintThread "REPL example1-3-end"

Use d3.Dispose() to stop it.

The output of the Case 1 is the following:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
//Output:
(*
Loc:REPL example1-1-begin Thread:1
Loc:REPL example1-1-end Thread:1
    Loc:Async before Download Thread:12 //*
    Loc:Async after Download Thread:16
    Loc:OnNext Thread:16
        Stock1-->{Name = ""Google Inc."";Symbol = ""GOOG"";LatestValue = 523.4;OpenValue = 521.08;CloseValue = 521.84;}
    Loc:Async before Download Thread:12 //*
    Loc:Async after Download Thread:15
    Loc:OnNext Thread:15
        Stock1-->{Name = ""Google Inc."";Symbol = ""GOOG"";LatestValue = 523.4;OpenValue = 521.08;CloseValue = 521.84;}
    Loc:Async before Download Thread:12 //*
    Loc:Async after Download Thread:20
    Loc:OnNext Thread:20
        Stock1-->{Name = ""Google Inc."";Symbol = ""GOOG"";LatestValue = 523.4;OpenValue = 521.08;CloseValue = 521.84;}
    Loc:OnCompleted Thread:20
Stock1 completed
*)

The output of the Case 2 is the following:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
//Output:
(*
Loc:REPL example1-2-begin Thread:1
Loc:REPL example1-2-end Thread:1
    Loc:Async before Download Thread:7
    Loc:Async after Download Thread:7 //*
    Loc:OnNext Thread:7 //*
        Stock2-->{Name = ""Microsoft Corporation"";Symbol = ""MSFT"";LatestValue = 44.4;OpenValue = 44.48;CloseValue = 44.44;}
    Loc:Async before Download Thread:20
    Loc:Async after Download Thread:8 //*
    Loc:OnNext Thread:8 //*
        Stock2-->{Name = ""Microsoft Corporation"";Symbol = ""MSFT"";LatestValue = 44.4;OpenValue = 44.48;CloseValue = 44.44;}
    Loc:Async before Download Thread:20
    Loc:Async after Download Thread:7 //*
    Loc:OnNext Thread:7 //*
        Stock2-->{Name = ""Microsoft Corporation"";Symbol = ""MSFT"";LatestValue = 44.4;OpenValue = 44.48;CloseValue = 44.44;}
    Loc:OnCompleted Thread:7 //*
Stock2 completed
*)

The output of the Case 3 is the following:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
//Output:
(*
Loc:REPL example1-3-begin Thread:1
Loc:REPL example1-3-end Thread:1
    Loc:Async before Download Thread:6
    Loc:Async after Download Thread:6
    Loc:OnNext Thread:15
        Stock3-->{Name = ""Microsoft Corporation"";Symbol = ""MSFT"";LatestValue = 44.4;OpenValue = 44.48;CloseValue = 44.44;}
    Loc:Async before Download Thread:16
    Loc:Async after Download Thread:6
    Loc:OnNext Thread:15
        Stock3-->{Name = ""Microsoft Corporation"";Symbol = ""MSFT"";LatestValue = 44.4;OpenValue = 44.48;CloseValue = 44.44;}
    Loc:Async before Download Thread:5
    Loc:Async after Download Thread:6
    Loc:OnNext Thread:15
        Stock3-->{Name = ""Microsoft Corporation"";Symbol = ""MSFT"";LatestValue = 44.4;OpenValue = 44.48;CloseValue = 44.44;}
    Loc:OnCompleted Thread:15
Stock3 completed
*)

Query Style

Repeat the same example using Query style programming.

  • Create an "event" (observable) every 5 seconds (fast for testing pursose)

  • Use observe to create a Observable Sequence that gets Stocks data asynchoronous.

  • Use rxquery to take 5 elements of the Stock Observable Sequence.

  • Use observe to split into individual Stocks Observable Sequence.
 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
let t1a=Observable.interval (TimeSpan.FromSeconds(5.0))
let QStocksA= observe {
        let! r=Observable.ofAsync (getStockData "GOOG,MSFT" "nsl1op") //Is it blocking?
        yield r
    }
let QStocksB= rxquery {
        for i in t1a do
        take 5
        for j in QStocksA do
        select j
}
let QStocksC1= observe {
        let! r=QStocksB
        yield r.[0].LatestValue
    }
let QStocksC2= observe {
        let! r=QStocksB
        yield r.[1].LatestValue
    }

let d3a=Helper.Dump3 "Stock1" QStocksC1
let d3b=Helper.Dump3 "Stock1" QStocksC2

Use d3a.Dispose() and d3b.Dispose() to stop observation.

To do:

Change the Query style solution to make it efficient.* To keep it simple, this Query solution creates one Observable Sequence for each Stock. This makes one Stock data internet collection per each Stock.

namespace System
namespace System.Collections
namespace System.Collections.Generic
namespace System.Threading
namespace System.Net
namespace System.Reactive
namespace System.Reactive.Linq
namespace System.Reactive.Disposables
namespace System.Reactive.Concurrency
namespace System.Reactive.Subjects
namespace Microsoft
namespace Microsoft.Reactive
namespace Microsoft.Reactive.Testing
namespace FSharp
namespace FSharp.Control
namespace FSharp.Control.Reactive
module Builders

from FSharp.Control.Reactive
module Observable

from FSharp.Control.Reactive
val name : string

Full name: Test_1.Helper.name
Multiple items
type String =
  new : value:char -> string + 7 overloads
  member Chars : int -> char
  member Clone : unit -> obj
  member CompareTo : value:obj -> int + 1 overload
  member Contains : value:string -> bool
  member CopyTo : sourceIndex:int * destination:char[] * destinationIndex:int * count:int -> unit
  member EndsWith : value:string -> bool + 2 overloads
  member Equals : obj:obj -> bool + 2 overloads
  member GetEnumerator : unit -> CharEnumerator
  member GetHashCode : unit -> int
  ...

Full name: System.String

--------------------
String(value: nativeptr<char>) : unit
String(value: nativeptr<sbyte>) : unit
String(value: char []) : unit
String(c: char, count: int) : unit
String(value: nativeptr<char>, startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int) : unit
String(value: char [], startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int, enc: Text.Encoding) : unit
field string.Empty
val PrintThread : location:string -> unit

Full name: Test_1.Helper.PrintThread
val location : string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start:ThreadStart -> Thread + 3 overloads
  member Abort : unit -> unit + 1 overload
  member ApartmentState : ApartmentState with get, set
  member CurrentCulture : CultureInfo with get, set
  member CurrentUICulture : CultureInfo with get, set
  member DisableComObjectEagerCleanup : unit -> unit
  member ExecutionContext : ExecutionContext
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  ...

Full name: System.Threading.Thread

--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
property Thread.CurrentThread: Thread
property Thread.ManagedThreadId: int
val f1 : i:'T -> unit

Full name: Test_1.Helper.f1
val i : 'T
val f2 : ex:Exception -> unit

Full name: Test_1.Helper.f2
val ex : Exception
Multiple items
type Exception =
  new : unit -> Exception + 2 overloads
  member Data : IDictionary
  member GetBaseException : unit -> Exception
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member GetType : unit -> Type
  member HelpLink : string with get, set
  member InnerException : Exception
  member Message : string
  member Source : string with get, set
  member StackTrace : string
  ...

Full name: System.Exception

--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
property Exception.Message: string
val f3 : unit -> unit

Full name: Test_1.Helper.f3
val Dump1 : name:string -> source:IObservable<'T> -> IDisposable

Full name: Test_1.Helper.Dump1
val name : string
val source : IObservable<'T>
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
type Observable =
  static member Aggregate<'TSource> : source:IObservable<'TSource> * accumulator:Func<'TSource, 'TSource, 'TSource> -> IObservable<'TSource> + 2 overloads
  static member All<'TSource> : source:IObservable<'TSource> * predicate:Func<'TSource, bool> -> IObservable<bool>
  static member Amb<'TSource> : params sources:IObservable<'TSource>[] -> IObservable<'TSource> + 2 overloads
  static member And<'TLeft, 'TRight> : left:IObservable<'TLeft> * right:IObservable<'TRight> -> Pattern<'TLeft, 'TRight>
  static member Any<'TSource> : source:IObservable<'TSource> -> IObservable<bool> + 1 overload
  static member AsObservable<'TSource> : source:IObservable<'TSource> -> IObservable<'TSource>
  static member Average : source:IObservable<float> -> IObservable<float> + 19 overloads
  static member Buffer<'TSource, 'TBufferClosing> : source:IObservable<'TSource> * bufferClosingSelector:Func<IObservable<'TBufferClosing>> -> IObservable<IList<'TSource>> + 10 overloads
  static member Case<'TValue, 'TResult> : selector:Func<'TValue> * sources:IDictionary<'TValue, IObservable<'TResult>> -> IObservable<'TResult> + 2 overloads
  static member Cast<'TResult> : source:IObservable<obj> -> IObservable<'TResult>
  ...

Full name: System.Reactive.Linq.Observable
val subscribeWithCallbacks : onNext:('T -> unit) -> onError:(exn -> unit) -> onCompleted:(unit -> unit) -> observable:IObservable<'T> -> IDisposable

Full name: FSharp.Control.Reactive.Observable.subscribeWithCallbacks
val Dump2 : name:string -> source:IObservable<'T> -> IDisposable

Full name: Test_1.Helper.Dump2
val subscribeOn : scheduler:IScheduler -> source:IObservable<'Source> -> IObservable<'Source>

Full name: FSharp.Control.Reactive.Observable.subscribeOn
type Scheduler =
  static member AsLongRunning : scheduler:IScheduler -> ISchedulerLongRunning
  static member AsPeriodic : scheduler:IScheduler -> ISchedulerPeriodic
  static member AsStopwatchProvider : scheduler:IScheduler -> IStopwatchProvider
  static member Catch<'TException> : scheduler:IScheduler * handler:Func<'TException, bool> -> IScheduler
  static member CurrentThread : CurrentThreadScheduler
  static member Default : DefaultScheduler
  static member DisableOptimizations : scheduler:IScheduler -> IScheduler + 1 overload
  static member Immediate : ImmediateScheduler
  static member NewThread : IScheduler
  static member Normalize : timeSpan:TimeSpan -> TimeSpan
  ...

Full name: System.Reactive.Concurrency.Scheduler
property Scheduler.Default: DefaultScheduler
val Dump3 : name:string -> source:IObservable<'T> -> IDisposable

Full name: Test_1.Helper.Dump3
val observeOn : scheduler:IScheduler -> source:IObservable<'a> -> IObservable<'a>

Full name: FSharp.Control.Reactive.Observable.observeOn
val url : string

Full name: Test_1.url
type Stocks =
  {Name: string;
   Symbol: string;
   LatestValue: float;
   OpenValue: float;
   CloseValue: float;}

Full name: Test_1.Stocks
Stocks.Name: string
Stocks.Symbol: string
Stocks.LatestValue: float
Multiple items
val float : value:'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float = Double

Full name: Microsoft.FSharp.Core.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
Stocks.OpenValue: float
Stocks.CloseValue: float
val getStockData : stock:string -> fields:string -> Async<Stocks []>

Full name: Test_1.getStockData
val stock : string
val fields : string
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val wc : WebClient
Multiple items
type WebClient =
  inherit Component
  new : unit -> WebClient
  member BaseAddress : string with get, set
  member CachePolicy : RequestCachePolicy with get, set
  member CancelAsync : unit -> unit
  member Credentials : ICredentials with get, set
  member DownloadData : address:string -> byte[] + 1 overload
  member DownloadDataAsync : address:Uri -> unit + 1 overload
  member DownloadFile : address:string * fileName:string -> unit + 1 overload
  member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
  member DownloadString : address:string -> string + 1 overload
  ...

Full name: System.Net.WebClient

--------------------
WebClient() : unit
module Helper

from Test_1
val data : string
member WebClient.AsyncDownloadString : address:Uri -> Async<string>
Multiple items
type Uri =
  new : uriString:string -> Uri + 5 overloads
  member AbsolutePath : string
  member AbsoluteUri : string
  member Authority : string
  member DnsSafeHost : string
  member Equals : comparand:obj -> bool
  member Fragment : string
  member GetComponents : components:UriComponents * format:UriFormat -> string
  member GetHashCode : unit -> int
  member GetLeftPart : part:UriPartial -> string
  ...

Full name: System.Uri

--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
val dataLines : string []
String.Split(params separator: char []) : string []
String.Split(separator: string [], options: StringSplitOptions) : string []
String.Split(separator: char [], options: StringSplitOptions) : string []
String.Split(separator: char [], count: int) : string []
String.Split(separator: string [], count: int, options: StringSplitOptions) : string []
String.Split(separator: char [], count: int, options: StringSplitOptions) : string []
type StringSplitOptions =
  | None = 0
  | RemoveEmptyEntries = 1

Full name: System.StringSplitOptions
field StringSplitOptions.RemoveEmptyEntries = 1
val s : Stocks []
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
val line : string
val infos : string []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : params indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val ofSeq : source:seq<'T> -> 'T []

Full name: Microsoft.FSharp.Collections.Array.ofSeq
val r : Stocks []

Full name: Test_1.r
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
val t1 : IObservable<int64>

Full name: Test_1.t1
val interval : period:TimeSpan -> IObservable<int64>

Full name: FSharp.Control.Reactive.Observable.interval
Multiple items
type TimeSpan =
  struct
    new : ticks:int64 -> TimeSpan + 3 overloads
    member Add : ts:TimeSpan -> TimeSpan
    member CompareTo : value:obj -> int + 1 overload
    member Days : int
    member Duration : unit -> TimeSpan
    member Equals : value:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member Hours : int
    member Milliseconds : int
    member Minutes : int
    ...
  end

Full name: System.TimeSpan

--------------------
TimeSpan()
TimeSpan(ticks: int64) : unit
TimeSpan(hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : unit
TimeSpan.FromSeconds(value: float) : TimeSpan
val o1 : IObservable<int64>

Full name: Test_1.o1
val take : n:int -> source:IObservable<'Source> -> IObservable<'Source>

Full name: FSharp.Control.Reactive.Observable.take
val o2 : IObservable<Stocks []>

Full name: Test_1.o2
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,int,IEnumerable<'TResult>>) : IObservable<'TResult>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,IEnumerable<'TResult>>) : IObservable<'TResult>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,int,CancellationToken,Tasks.Task<'TResult>>) : IObservable<'TResult>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,CancellationToken,Tasks.Task<'TResult>>) : IObservable<'TResult>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,int,Tasks.Task<'TResult>>) : IObservable<'TResult>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,Tasks.Task<'TResult>>) : IObservable<'TResult>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,int,IObservable<'TResult>>) : IObservable<'TResult>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,IObservable<'TResult>>) : IObservable<'TResult>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TOther>(source: IObservable<'TSource>, other: IObservable<'TOther>) : IObservable<'TOther>
   (+0 other overloads)
Observable.SelectMany<'TSource,'TCollection,'TResult>(source: IObservable<'TSource>, collectionSelector: Func<'TSource,int,IEnumerable<'TCollection>>, resultSelector: Func<'TSource,int,'TCollection,int,'TResult>) : IObservable<'TResult>
   (+0 other overloads)
val ofAsync : asyncOperation:Async<'a> -> IObservable<'a>

Full name: FSharp.Control.Reactive.Observable.ofAsync
val oStock1 : IObservable<Stocks>

Full name: Test_1.oStock1
Multiple items
val map : f:('a -> 'b) -> source:IObservable<'a> -> IObservable<'b>

Full name: FSharp.Control.Reactive.Observable.map

--------------------
val map : mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U>

Full name: Microsoft.FSharp.Control.Observable.map
val i : Stocks []
val oStock2 : IObservable<Stocks>

Full name: Test_1.oStock2
val d1 : IDisposable

Full name: Test_1.d1
val d2 : IDisposable

Full name: Test_1.d2
val d3 : IDisposable

Full name: Test_1.d3
val t1a : IObservable<int64>

Full name: Test_1.t1a
val QStocksA : IObservable<Stocks []>

Full name: Test_1.QStocksA
val observe : ObservableBuilder

Full name: FSharp.Control.Reactive.Builders.observe
val r : Stocks []
val QStocksB : IObservable<Stocks []>

Full name: Test_1.QStocksB
val rxquery : RxQueryBuilder

Full name: FSharp.Control.Reactive.Builders.rxquery
val i : int64
custom operation: take (int)

Calls RxQueryBuilder.Take
val j : Stocks []
custom operation: select ('a)

Calls RxQueryBuilder.Select
val QStocksC1 : IObservable<float>

Full name: Test_1.QStocksC1
val QStocksC2 : IObservable<float>

Full name: Test_1.QStocksC2
val d3a : IDisposable

Full name: Test_1.d3a
val d3b : IDisposable

Full name: Test_1.d3b

Saturday, May 16, 2015

F# and MQTT - example 1:

This post presents a F# Interactive Script example of the MQTT protocol.
Using the same client node, the example does Publish and Subscribe, of Simple and Complex data, using Json and Binary serialization. The following software were used:
-Server: Mosquitto
-Client: M2Mqtt
-Serialization: Json: Json.Net, Binary: FSPickler

MQTT

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.
MQTT v3.1.1 has now become an OASIS Standard.
MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimize network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.

Concepts

Publish/Subscribe:

The MQTT protocol is based on the principle of publishing messages and subscribing to topics, or "pub/sub". Multiple clients connect to a broker and subscribe to topics that they are interested in. Clients also connect to the broker and publish messages to topics. Many clients may subscribe to the same topics and do with the information as they please. The broker and MQTT act as a simple, common interface for everything to connect to. This means that you if you have clients that dump subscribed messages to a database, to Twitter or even a simple text file, then it becomes very simple to add new sensors or other data input to a database,Twitter or so on.

Topics/Subscriptions: 

Messages in MQTT are published on topics. There is no need to configure a topic, publishing on it is enough. Topics are treated as a hierarchy, using a slash (/) as a separator. This allows sensible arrangement of common themes to be created, much in the same way as a filesystem. For example, multiple computers may all publish their hard drive temperature information on the following topic, with their own computer and hard drive name being replaced as appropriate:sensors/COMPUTER_NAME/temperature/HARDDRIVE_NAME Clients can receive messages by creating subscriptions. A subscription may be to an explicit topic, in which case only messages to that topic will be received, or it may include wildcards. Two wildcards are available, + or #.
+ can be used as a wildcard for a single level of hierarchy. It could be used with the topic above to get information on all computers and hard drives as follows:sensors/+/temperature/+ # can be used as a wildcard for all remaining levels of hierarchy. This means that it must be the final character in a subscription. With a topic of "a/b/c/d", the following example subscriptions will match: a/b/#, a/b/c/#, +/b/c/# Zero length topic levels are valid.

Clean session / Durable connections:

 On connection, a client sets the "clean session" flag, which is sometimes also known as the "clean start" flag. If clean session is set to false, then the connection is treated as durable. This means that when the client disconnects, any subscriptions it has will remain and any subsequent QoS 1 or 2 messages will be stored until it connects again in the future. If clean session is true, then all subscriptions will be removed for the client when it disconnects. Will: When a client connects to a broker, it may inform the broker that it has a will. This is a message that it wishes the broker to send when the client disconnects unexpectedly. The will message has a topic, QoS and retain status just the same as any other message.

Retained Messages: 

All messages may be set to be retained. This means that the broker will keep the message even after sending it to all current subscribers. If a new subscription is made that matches the topic of the retained message, then the message will be sent to the client. This is useful as a "last known good" mechanism. If a topic is only updated infrequently, then without a retained message, a newly subscribed client may have to wait a long time to receive an update. With a retained message, the client will receive an instant update. Quality of Service: MQTT defines three levels of Quality of Service (QoS). The QoS defines how hard the broker/client will try to ensure that a message is received. Messages may be sent at any QoS level, and clients may attempt to subscribe to topics at any QoS level. This means that the client chooses the maximum QoS it will receive. For example, if a message is published at QoS 2 and a client is subscribed with QoS 0, the message will be delivered to that client with QoS 0. If a second client is also subscribed to the same topic, but with QoS 2, then it will receive the same message but with QoS 2. For a second example, if a client is subscribed with QoS 2 and a message is published on QoS 0, the client will receive it on QoS 0. Higher levels of QoS are more reliable, but involve higher latency and have higher bandwith requirements. 0: The broker/client will deliver the message once, with no confirmation. 1: The broker/client will deliver the message at least once, with confirmation required. 2: The broker/client will deliver the message exactly once by using a four step handshake.

REMARKS

Windows Installation: Mosquitto is installed in "Program Files (x86)" where the files are read-only and requires Admin priviledges. For now, to update the configuration file, make a copy to another folder, do the update and copy it back.

F# Example 1

Script Code:

//M2MqttTest_1.fsx
//Celso Axelrud
//rev.: 5/16/2015-4:15pm

(*
MQTT example 1: Same client node Pub/Sub of simple and complex data using json and binary serialization.
 
Server: Mosquitto
Client: M2Mqtt
Serialization:
    -Json: Json.Net
    -Binary: FSPickler
*)

//Libraries-----------------------
#I @"C:\Project(comp)\Dev_2015\MQTT_1\MQTT_Proj_1\Lib"
#r "M2Mqtt.dll" 
#r "Newtonsoft.Json.dll"
#r "FsPickler.dll"

//Open System---------------------
open System
open System.Text

//Open M2Mqtt---------------------
open uPLibrary.Networking.M2Mqtt
open uPLibrary.Networking.M2Mqtt.Exceptions;
open uPLibrary.Networking.M2Mqtt.Messages;
open uPLibrary.Networking.M2Mqtt.Session;
open uPLibrary.Networking.M2Mqtt.Utility;
open uPLibrary.Networking.M2Mqtt.Internal;

//Create client node--------------
let node = new MqttClient(brokerHostName="localhost")

//Create subscription handles-----
//Received 
let MsgReceived (e:MqttMsgPublishEventArgs) =
    printfn "Sub Received Topic: %s" e.Topic
    printfn "Sub Received Qos: %u" e.QosLevel
    printfn "Sub Received Retain: %b" e.Retain
    printfn "Sub Received Message: %s" (Encoding.ASCII.GetString e.Message)
node.MqttMsgPublishReceived.Add(MsgReceived)

//Publish (requires QoS Level 1 or 2)
let MsgPublish (e:MqttMsgPublishedEventArgs) =
    printfn "Pub Message Published: %b " e.IsPublished
node.MqttMsgPublished.Add(MsgPublish)

//Subscribed (requires QoS Level 1 or 2)
let MsgSubscribed (e:MqttMsgSubscribedEventArgs) =
    printfn "Sub Message Subscribed: %s " (Encoding.ASCII.GetString e.GrantedQoSLevels) 
node.MqttMsgSubscribed.Add(MsgSubscribed)

//Unsubscribed (requires QoS Level 1 or 2) 
let MsgUnsubscribed (e:MqttMsgUnsubscribedEventArgs) =
    printfn "Sub Message Unsubscribed: %i " e.MessageId 
node.MqttMsgUnsubscribed.Add(MsgUnsubscribed)

//Connect-------------------------
node.Connect(clientId="Node1Conn1",username="caxelrud",password="laranja1",
                willRetain=false,willQosLevel=0uy,willFlag=true,willTopic="system/LWT/Node1Conn1",willMessage="offline",
                cleanSession=true,keepAlivePeriod=60us)

//Get node info (Interactive)-----
node;;
node.CleanSession;; node.ClientId;; node.Settings;;
node.WillFlag;; node.WillQosLevel;;
node.WillTopic;; node.WillMessage;;

//Subscribe-----------------------
let  topics1:string[] = [| "sensor/A10/TI100"; "sensor/A10/FI001" |]
let qosLevels1:byte[] = [|MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE; MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE |]
let grantedQos1 = node.Subscribe(topics1, qosLevels1)

//Subscribe to Connection Last Will 
let topics2:string[] = [| "system/LWT/Node1Conn1" |]
let qosLevels2:byte[] = [|MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE |]
let grantedQos2 = node.Subscribe(topics2, qosLevels2)

//Publish-------------------------
let mutable Temp1="100.0"
node.Publish("sensor/A10/TI100", Encoding.UTF8.GetBytes(Temp1))

//Publish-------------------------
Temp1<-"200.0"
node.Publish("sensor/A10/TI100", Encoding.UTF8.GetBytes(Temp1))

//Publish but no notification because we didn't subscribe to this point
node.Publish("sensor/A11/TI101", Encoding.UTF8.GetBytes("1.0"))

//Complex Messages----------------
//json----------------------------
open Newtonsoft.Json

//Type for a group of tags with group name and group date & time
type Tag={Name:string;Value:float;Qual:int}
type Rec={Name:string;Time:System.DateTime;Tags:Tag list}

//Subscribe-----------------------
let  topics3:string[] = [| "sensor/A10/Group1" |]
let qosLevels3:byte[] = [|MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE|]
let grantedQos3 = node.Subscribe(topics3, qosLevels3)

//Generate random values----------
let values1=[for i in 1..2 -> System.Random().NextDouble() ]

//Create the complex point--------
let tags1 = [
        { Name = "PI100"; Value = values1.[0];Qual=0 };
        { Name = "PI101"; Value = values1.[1];Qual=0 }
    ]
let grp1={Name="Group1";Time=System.DateTime.UtcNow;Tags=tags1}

//Serialize-----------------------
let grp1j = JsonConvert.SerializeObject(grp1)
//Deserialize (test)-------------- 
let grp1d = JsonConvert.DeserializeObject<Rec>(grp1j)
//Publish-------------------------
node.Publish("sensor/A10/Group1", message=Encoding.UTF8.GetBytes(grp1j),qosLevel=0uy,retain=true)

//binary serialization---------------
open Nessos.FsPickler

//Subscribe-----------------------
let  topics4:string[] = [| "sensor/A10/Group2" |]
let qosLevels4:byte[] = [|MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE|]
let grantedQos4 = node.Subscribe(topics4, qosLevels4)

//Create the complex point--------
let grp2={Name="Group2";Time=System.DateTime.UtcNow;Tags=tags1}
let binary = FsPickler.CreateBinary()

//Serialize-----------------------
let grp1b=binary.Pickle grp2

//Deserialize (test)-------------- 
let grp1db=binary.UnPickle<Rec> grp1b

//Publish-------------------------
node.Publish("sensor/A10/Group2", message=grp1b,qosLevel=0uy,retain=true)

//Unsubscribe---------------------
node.Unsubscribe(topics1)
node.Unsubscribe(topics2)
node.Unsubscribe(topics3)
node.Unsubscribe(topics4)

//Client Disconnect---------------
node.Disconnect()

Mosquitto Parameters:

#retry_interval 20 #sys_interval 10 #store_clean_interval 10 #pid_file #user mosquitto #max_inflight_messages 20 #max_queued_messages 100 #queue_qos0_messages false #message_size_limit 0 #allow_zero_length_clientid true #auto_id_prefix #persistent_client_expiration #allow_duplicate_messages false #upgrade_outgoing_qos false #bind_address #port 1883 #max_connections -1 #protocol mqtt #http_dir #use_username_as_clientid #cafile #capath #certfile #keyfile #tls_version #require_certificate false #use_identity_as_username false #crlfile #ciphers DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2:@STRENGTH #psk_hint #ciphers #listener #max_connections -1 #mount_point #protocol mqtt #http_dir #use_username_as_clientid #cafile #capath #certfile #keyfile #require_certificate false #crlfile #ciphers #psk_hint #ciphers #autosave_interval 1800 #autosave_on_changes false #persistence false #persistence_file mosquitto.db #persistence_location #log_dest stderr #log_facility #log_type error #log_type warning #log_type notice #log_type information #websockets_log_level 0 #connection_messages true #log_timestamp true #clientid_prefixes #auth_plugin #password_file #psk_file #acl_file #connection <name> #address <host>[:<port>] [<host>[:<port>]] #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] #bridge_attempt_unsubscribe true #round_robin false #remote_clientid #local_clientid #cleansession false #notifications true #notification_topic #keepalive_interval 60 #start_type automatic #restart_timeout 30 #idle_timeout 60 #threshold 10 #try_private true #remote_username #remote_password #bridge_cafile #bridge_capath #bridge_certfile #bridge_keyfile #bridge_insecure false #bridge_identity #bridge_psk #include_dir #ffdc_output #max_log_entries #trace_level #trace_output *)