HIGH PERFORMANCE REAL-TIME STOCKS COMPUTATION ENGINE - part 1
This blog explores a High Performance Real-Time Stocks Computation Engine for data collection and processing. It can be the basis of a Real-time Trading System.
High Performance is obtained by using Reactive and Non-blocking code.
The code has the following properties:
Developed for F# REPL script.
Uses .NET Reactive Extensions and FSharp.Control.Reactive for the PUSH pattern style coding.
Uses F# Asynchonous workflow for PULL pattern style coding.
Collects Stock data from "https://code.google.com/p/yahoo-finance-managed/wiki/YahooFinanceAPIs"
As an overall pattern, the PUSH pattern is the preferred style, eventhough, the example, presented here, starts by, periodicly, pull data from the web.
The main reason is related to the fact that the communication with other processes/computers is expected to use Publish/Subscribe, and this is a PUSH pattern.
Example steps
Define references to the libraries.
For Reactive Extensions.
1: 2: 3: 4: 5: 6: |
|
For FSharp.Control.Reactive.
1:
|
|
Open the libraries.
1: 2: 3: 4: |
|
Open Reactive Extensions libraries.
1: 2: 3: 4: 5: 6: |
|
Open FSharp.Control.Reactive libraries.
1: 2: 3: |
|
Define a Helper module.
It includes several observers to help display results.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: |
|
The following fields are being collected. They are combined to form the field f=nsl1op
in
http://download.finance.yahoo.com/d/quotes.csv?s=MSFT,GOOG&f=nsl1op&e=.csv URL:
- name(n),symbol(s),latestValue(l1),open(o),closeValue(p)
Define the URL.
1:
|
|
Define a Stock type to hold Stocks information.
1:
|
|
Define an Asynchonous Workflow to get Stock data.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: |
|
You can test getStockInfo
using:
1: 2: |
|
Create Observables Sequences.
Create an "event" (observable) every 5 seconds (fast for testing pursose)
Take 3 elements
Get Stocks information asynchoronous
- Split into individual Stocks
1: 2: 3: 4: 5: |
|
Observe with different Observers
Comments:
Case 1:
Main thread (1) is not blocked.
Observable Sequence is blocking Thread 12.
Obsever (OnNext) is called in the same thread as the Observable Sequence
Case 2:
- Obsever (OnNext) is called in the same thread as the Observable Sequence
Case 3:
- No blocking of Threads.
Case 1
(using Dump1 observer)
1: 2: 3: |
|
Use d1.Dispose()
to stop it.
Case 2
(using Dump2 observer)
1: 2: 3: |
|
Use d2.Dispose()
to stop it.
Case 3
(using Dump3 observer)
1: 2: 3: |
|
Use d3.Dispose()
to stop it.
The output of the Case 1 is the following:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: |
|
The output of the Case 2 is the following:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: |
|
The output of the Case 3 is the following:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: |
|
Query Style
Repeat the same example using Query style programming.
Create an "event" (observable) every 5 seconds (fast for testing pursose)
Use
observe
to create a Observable Sequence that gets Stocks data asynchoronous.Use
rxquery
to take 5 elements of the Stock Observable Sequence.- Use
observe
to split into individual Stocks Observable Sequence.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: |
|
Use d3a.Dispose()
and d3b.Dispose()
to stop observation.
To do:
Change the Query style solution to make it efficient.* To keep it simple, this Query solution creates one Observable Sequence for each Stock. This makes one Stock data internet collection per each Stock.
from FSharp.Control.Reactive
from FSharp.Control.Reactive
Full name: Test_1.Helper.name
type String =
new : value:char -> string + 7 overloads
member Chars : int -> char
member Clone : unit -> obj
member CompareTo : value:obj -> int + 1 overload
member Contains : value:string -> bool
member CopyTo : sourceIndex:int * destination:char[] * destinationIndex:int * count:int -> unit
member EndsWith : value:string -> bool + 2 overloads
member Equals : obj:obj -> bool + 2 overloads
member GetEnumerator : unit -> CharEnumerator
member GetHashCode : unit -> int
...
Full name: System.String
--------------------
String(value: nativeptr<char>) : unit
String(value: nativeptr<sbyte>) : unit
String(value: char []) : unit
String(c: char, count: int) : unit
String(value: nativeptr<char>, startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int) : unit
String(value: char [], startIndex: int, length: int) : unit
String(value: nativeptr<sbyte>, startIndex: int, length: int, enc: Text.Encoding) : unit
Full name: Test_1.Helper.PrintThread
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
type Thread =
inherit CriticalFinalizerObject
new : start:ThreadStart -> Thread + 3 overloads
member Abort : unit -> unit + 1 overload
member ApartmentState : ApartmentState with get, set
member CurrentCulture : CultureInfo with get, set
member CurrentUICulture : CultureInfo with get, set
member DisableComObjectEagerCleanup : unit -> unit
member ExecutionContext : ExecutionContext
member GetApartmentState : unit -> ApartmentState
member GetCompressedStack : unit -> CompressedStack
member GetHashCode : unit -> int
...
Full name: System.Threading.Thread
--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
Full name: Test_1.Helper.f1
Full name: Test_1.Helper.f2
type Exception =
new : unit -> Exception + 2 overloads
member Data : IDictionary
member GetBaseException : unit -> Exception
member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
member GetType : unit -> Type
member HelpLink : string with get, set
member InnerException : Exception
member Message : string
member Source : string with get, set
member StackTrace : string
...
Full name: System.Exception
--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
Full name: Test_1.Helper.f3
Full name: Test_1.Helper.Dump1
member Subscribe : observer:IObserver<'T> -> IDisposable
Full name: System.IObservable<_>
static member Aggregate<'TSource> : source:IObservable<'TSource> * accumulator:Func<'TSource, 'TSource, 'TSource> -> IObservable<'TSource> + 2 overloads
static member All<'TSource> : source:IObservable<'TSource> * predicate:Func<'TSource, bool> -> IObservable<bool>
static member Amb<'TSource> : params sources:IObservable<'TSource>[] -> IObservable<'TSource> + 2 overloads
static member And<'TLeft, 'TRight> : left:IObservable<'TLeft> * right:IObservable<'TRight> -> Pattern<'TLeft, 'TRight>
static member Any<'TSource> : source:IObservable<'TSource> -> IObservable<bool> + 1 overload
static member AsObservable<'TSource> : source:IObservable<'TSource> -> IObservable<'TSource>
static member Average : source:IObservable<float> -> IObservable<float> + 19 overloads
static member Buffer<'TSource, 'TBufferClosing> : source:IObservable<'TSource> * bufferClosingSelector:Func<IObservable<'TBufferClosing>> -> IObservable<IList<'TSource>> + 10 overloads
static member Case<'TValue, 'TResult> : selector:Func<'TValue> * sources:IDictionary<'TValue, IObservable<'TResult>> -> IObservable<'TResult> + 2 overloads
static member Cast<'TResult> : source:IObservable<obj> -> IObservable<'TResult>
...
Full name: System.Reactive.Linq.Observable
Full name: FSharp.Control.Reactive.Observable.subscribeWithCallbacks
Full name: Test_1.Helper.Dump2
Full name: FSharp.Control.Reactive.Observable.subscribeOn
static member AsLongRunning : scheduler:IScheduler -> ISchedulerLongRunning
static member AsPeriodic : scheduler:IScheduler -> ISchedulerPeriodic
static member AsStopwatchProvider : scheduler:IScheduler -> IStopwatchProvider
static member Catch<'TException> : scheduler:IScheduler * handler:Func<'TException, bool> -> IScheduler
static member CurrentThread : CurrentThreadScheduler
static member Default : DefaultScheduler
static member DisableOptimizations : scheduler:IScheduler -> IScheduler + 1 overload
static member Immediate : ImmediateScheduler
static member NewThread : IScheduler
static member Normalize : timeSpan:TimeSpan -> TimeSpan
...
Full name: System.Reactive.Concurrency.Scheduler
Full name: Test_1.Helper.Dump3
Full name: FSharp.Control.Reactive.Observable.observeOn
Full name: Test_1.url
{Name: string;
Symbol: string;
LatestValue: float;
OpenValue: float;
CloseValue: float;}
Full name: Test_1.Stocks
val float : value:'T -> float (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.float
--------------------
type float = Double
Full name: Microsoft.FSharp.Core.float
--------------------
type float<'Measure> = float
Full name: Microsoft.FSharp.Core.float<_>
Full name: Test_1.getStockData
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
type WebClient =
inherit Component
new : unit -> WebClient
member BaseAddress : string with get, set
member CachePolicy : RequestCachePolicy with get, set
member CancelAsync : unit -> unit
member Credentials : ICredentials with get, set
member DownloadData : address:string -> byte[] + 1 overload
member DownloadDataAsync : address:Uri -> unit + 1 overload
member DownloadFile : address:string * fileName:string -> unit + 1 overload
member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
member DownloadString : address:string -> string + 1 overload
...
Full name: System.Net.WebClient
--------------------
WebClient() : unit
from Test_1
type Uri =
new : uriString:string -> Uri + 5 overloads
member AbsolutePath : string
member AbsoluteUri : string
member Authority : string
member DnsSafeHost : string
member Equals : comparand:obj -> bool
member Fragment : string
member GetComponents : components:UriComponents * format:UriFormat -> string
member GetHashCode : unit -> int
member GetLeftPart : part:UriPartial -> string
...
Full name: System.Uri
--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
String.Split(separator: string [], options: StringSplitOptions) : string []
String.Split(separator: char [], options: StringSplitOptions) : string []
String.Split(separator: char [], count: int) : string []
String.Split(separator: string [], count: int, options: StringSplitOptions) : string []
String.Split(separator: char [], count: int, options: StringSplitOptions) : string []
| None = 0
| RemoveEmptyEntries = 1
Full name: System.StringSplitOptions
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : params indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
Full name: Microsoft.FSharp.Collections.Array.ofSeq
Full name: Test_1.r
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
Full name: Test_1.t1
Full name: FSharp.Control.Reactive.Observable.interval
type TimeSpan =
struct
new : ticks:int64 -> TimeSpan + 3 overloads
member Add : ts:TimeSpan -> TimeSpan
member CompareTo : value:obj -> int + 1 overload
member Days : int
member Duration : unit -> TimeSpan
member Equals : value:obj -> bool + 1 overload
member GetHashCode : unit -> int
member Hours : int
member Milliseconds : int
member Minutes : int
...
end
Full name: System.TimeSpan
--------------------
TimeSpan()
TimeSpan(ticks: int64) : unit
TimeSpan(hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : unit
Full name: Test_1.o1
Full name: FSharp.Control.Reactive.Observable.take
Full name: Test_1.o2
(+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,IEnumerable<'TResult>>) : IObservable<'TResult>
(+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,int,CancellationToken,Tasks.Task<'TResult>>) : IObservable<'TResult>
(+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,CancellationToken,Tasks.Task<'TResult>>) : IObservable<'TResult>
(+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,int,Tasks.Task<'TResult>>) : IObservable<'TResult>
(+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,Tasks.Task<'TResult>>) : IObservable<'TResult>
(+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,int,IObservable<'TResult>>) : IObservable<'TResult>
(+0 other overloads)
Observable.SelectMany<'TSource,'TResult>(source: IObservable<'TSource>, selector: Func<'TSource,IObservable<'TResult>>) : IObservable<'TResult>
(+0 other overloads)
Observable.SelectMany<'TSource,'TOther>(source: IObservable<'TSource>, other: IObservable<'TOther>) : IObservable<'TOther>
(+0 other overloads)
Observable.SelectMany<'TSource,'TCollection,'TResult>(source: IObservable<'TSource>, collectionSelector: Func<'TSource,int,IEnumerable<'TCollection>>, resultSelector: Func<'TSource,int,'TCollection,int,'TResult>) : IObservable<'TResult>
(+0 other overloads)
Full name: FSharp.Control.Reactive.Observable.ofAsync
Full name: Test_1.oStock1
val map : f:('a -> 'b) -> source:IObservable<'a> -> IObservable<'b>
Full name: FSharp.Control.Reactive.Observable.map
--------------------
val map : mapping:('T -> 'U) -> source:IObservable<'T> -> IObservable<'U>
Full name: Microsoft.FSharp.Control.Observable.map
Full name: Test_1.oStock2
Full name: Test_1.d1
Full name: Test_1.d2
Full name: Test_1.d3
Full name: Test_1.t1a
Full name: Test_1.QStocksA
Full name: FSharp.Control.Reactive.Builders.observe
Full name: Test_1.QStocksB
Full name: FSharp.Control.Reactive.Builders.rxquery
Calls RxQueryBuilder.Take
Calls RxQueryBuilder.Select
Full name: Test_1.QStocksC1
Full name: Test_1.QStocksC2
Full name: Test_1.d3a
Full name: Test_1.d3b