Computer Science Experimentation

Saturday, December 25, 2010

WCF with F# interactive – Data Server example

Window Communication Foundation provides a runtime environment for services with the following characteristics:
- Exposes CLR types as services and consumes other services as CLR types. The conversion for the different protocol is done behind the scene, so that all the coding is done as you are in .NET environment.
- Implements the concept of remote objects with concurrency management (introduced by .NET Remoting).
- Implements a set of industrial standard protocols (TCP, HTCP, WSDL, REST)
- Implements fast communication protocol as MSWindows pipes
- Allows multiple bindings for the same .NET service code.
- Provides Hosting.
- Manages Fault, Security and Transactions.

The following example implements a client/server data-server.
Data-server is a service that can be used alone or added to an application engine. It provides efficient storage and retrieve of data. It allows usage of groups of points to improve the reading efficiency. The following example serves float type data. Other types of data as integer, double and string can be added. Another option is to create a generic contract and use multiple servers for each type of data.

The data server provides the same features that OPC-DA, with the addition of the possibility of extra bindings to pipes, html and other web-services protocols. The same way, OPC-DA .NET 3.0 (1.2) is a wrapper on the OPC server (based on COM) to provide the same extra functionalities.

In order to allow an easy environment for development and visualization and dynamic modifications of the application, this example was developed as scripts that are executed in the F# interactive environment. To add intellisense, it is used 2 instances of Visual Studio with F# Interactive.
The first VS is used to develop the service contract and to develop and execute the server in F# interactive.

The second VS is used to develop and execute the client in F# interactive.

This example, a MSWindows pipe binding without any security checks was used.

In order to allow all code development inside F# scripts, the server and client binding configuration was done programmatically. For the same reason, the client works directly with the channel.

The VS were opened in administrator mode to avoid any security blocking (for the file C:\Program Files (x86)\Microsoft Visual Studio 10.0\Common7\IDE\devenv.exe).

The file DS_Contract.fsx implements the data server contract.

This example code includes:
- Definition of DataItem type.
- Definition of IDataServer type with abstract types that includes the signature of all remote methods (interface).
- Definition of DataServer type with the implementation of remote and local methods. The local methods can be only used inside the server host.

This server can be used as stand-alone or embedded in another process, since it executes in a separeted thread to respond to the clients. For the stand-alone case, all the management of data items will be done remotely. For the embedded case, the local application can manage the data items.

The data items are kept in a immutable list, so it can’t be removed or updated after it is inserted. In order to modify it, the application needs to generate the full list of data items. This property provides maximum and consistent data reading speed.

The data server uses a dictionary to relate the item name (tag) to the position of the data list.

In order increase reading speed, the client should create groups of data. The group will hold an array to the data list positions. The group should be deleted if not needed anymore.

DS_Contract.fsx code:
#r "System.ServiceModel"
#r "System.Runtime.Serialization"

//namespace RTE.DataServer

open System
open System.ServiceModel
open System.Collections.Concurrent
open System.Collections.Generic
open System.Runtime.Serialization

    [<DataContract(Namespace="http://RTE/DataServer")>] 
    type DataItem()=
        let mutable Tag_=""
        let mutable Description_=""
        let mutable Value_=0.0
        let mutable Status_=0
        [<datamember>]
        member this.Tag with get()=Tag_ and set x = Tag_<-x
        [<datamember>]
        member this.Description with get()=Description_ and set x = Description_<-x
        [<datamember>]
        member this.Value with get()=Value_ and set x = Value_<-x
        [<datamember>]
        member this.Status with get()=Status_ and set x = Status_<-x

        override this.ToString()=
            sprintf "Tag=%s, Description=%s Value=%f, Status=%i" Tag_ Description_ Value_ Status_ 


    //service contract
    [<Servicecontract (namespace="http://RTE/DataServer" )>]
    type IDataServer=
        [<operationcontract>]
        abstract SetDataValueByName : tag:string -> value:float -> int
        [<operationcontract>]
        abstract SetDataStatusByName : tag:string -> value:int -> int
        [<operationcontract>]
        abstract GetDataValueByName : tag:string -> float*int

        [<operationcontract>]
        abstract RegisterData : tag:DataItem -> int
        [<operationcontract>]
        abstract RegisterDataArray : tags:DataItem[] -> int

        [<operationcontract>]
        abstract AddGroup : group:string -> tags:string[] -> int*int[]
        [<operationcontract>]
        abstract DeleteGroup : group:string -> int
        [<operationcontract>]
        abstract GetGroup : group:string -> int*DataItem[]
        [<operationcontract>]
        abstract GetGroupArray : group:string -> int*float[]*int[]


    //service implementation
    [<Servicebehavior(instancecontextmode=instancecontextmode.single,concurrencymode=concurrencymode.single)>]
    type DataServer()=
        //Constructor & Fields
        let DataDict= new ConcurrentDictionary()
        let DataList= new List()
        let GroupDict= new ConcurrentDictionary()
        let lockObj= ref 0
        //--------------------------------------------------------------------
        //Properties
        member this.dataDict with get()=DataDict
        member this.dataList with get()=DataList
        member this.groupDict with get()=GroupDict
        //--------------------------------------------------------------------
        //Local Methods
        //--------------------------------------------------------------------
        member this.SetDataValueByNameLocal (tag:string) (value:float) =
                lock(lockObj) (fun()->
                    try 
                        let i=DataDict.Item(tag)
                        DataList.Item(i).Value<-value
                        0
                    with
                    |_ -> 1    
                )
        //--------------------------------------------------------------------
        member this.SetDataStatusByNameLocal (tag:string) (status:int) =
                lock(lockObj) (fun()->
                    try 
                        let i=DataDict.Item(tag)
                        DataList.Item(i).Status<-status
                        0
                    with
                    |_ -> 1    
                )
        //--------------------------------------------------------------------
        member this.GetDataValueByNameLocal tag  = 
             lock(lockObj) (fun()->
                    try 
                        let i=DataDict.Item(tag)
                        DataList.Item(i).Value,0
                    with
                    |_ -> 0.0,1    
                )
        //--------------------------------------------------------------------
        //Group Methods
        member this.AddGroupLocal (group:string) (tags:string[])=
            if GroupDict.ContainsKey(group) then
                (1,[||])
            else
                let l= Array.zeroCreate(tags.Length)
                let e= Array.zeroCreate(tags.Length) //mark tags that doesn't exit
                //Check if all tags exit
                let mutable k=0
                let mutable j=0
                for i in tags do
                    if DataDict.ContainsKey(i) then
                        l.[k]<-DataDict.Item(i)
                    else
                        e.[k]<-1
                        j<-1
                    k<-k+1
                if j=0 then
                    //
                    GroupDict.TryAdd(group,l)|>ignore
                    (j,e)
                else
                    (j,e)
                
        //--------------------------------------------------------------------
        member this.DeleteGroupLocal (group:string)=
            if GroupDict.ContainsKey(group) then
                GroupDict.TryRemove(group)|>ignore
                0
            else
                1
        //--------------------------------------------------------------------
        member this.GetGroupLocal (group:string)=
            if GroupDict.ContainsKey(group) then
                let l=GroupDict.Item(group)
                let a= Array.zeroCreate(l.Length)
                let mutable k=0
                for i in l do
                    a.[k]<-DataList.[i]
                    k<-k+1
                (0,a)
            else
                (1,[||])
        //--------------------------------------------------------------------
        member this.GetGroupArrayLocal (group:string)=
            if GroupDict.ContainsKey(group) then
                let l=GroupDict.Item(group)
                let s= Array.zeroCreate(l.Length)
                let v : float array= Array.zeroCreate(l.Length)
                let mutable k=0
                for i in l do
                    v.[k]<-DataList.[i].Value                    
                    s.[k]<-DataList.[i].Status
                    k<-k+1
                (0,v,s)
            else
                (1,[||],[||])
        //--------------------------------------------------------------------    
        member this.RegisterDataLocal (tag:DataItem) = 
            lock(lockObj) (fun()->
                if DataDict.TryAdd(tag.Tag,DataList.Count) then
                    DataList.Add(tag)
                    0
                else
                    1)
        //--------------------------------------------------------------------
        member this.RegisterDataArrayLocal (tags:DataItem[])  = 
                //this.RegisterTag(i) 
            lock(lockObj) (fun()->
                if List.fold (&&) false [for t in tags ->DataDict.ContainsKey(t.Tag)] then
                    1
                else
                    for i in tags do
                        DataDict.TryAdd(i.Tag,DataList.Count)|>ignore
                        DataList.Add(i)
                    0)                
        //--------------------------------------------------------------------

        interface IDataServer with

            member this.AddGroup (group:string) (tags:string[])=
                   this.AddGroupLocal group tags
            //----------------------------------------------------------------
            member this.DeleteGroup group=
                   this.DeleteGroupLocal group
           //-----------------------------------------------------------------
            member this.GetGroup group=
                   this.GetGroupLocal group
            //----------------------------------------------------------------
            member this.SetDataValueByName tag value =
                this.SetDataValueByNameLocal tag value
            //----------------------------------------------------------------        
            member this.GetDataValueByName tag  =
                this.GetDataValueByNameLocal tag 
            //----------------------------------------------------------------
            member this.SetDataStatusByName tag status =
                this.SetDataStatusByNameLocal tag status
            //----------------------------------------------------------------
            member this.RegisterData (tag:DataItem) = 
                this.RegisterDataLocal(tag)
            //----------------------------------------------------------------
            member this.RegisterDataArray (tags:DataItem[])=
                this.RegisterDataArrayLocal(tags)
            //----------------------------------------------------------------
            member this.GetGroupArray (group:string)=
                this.GetGroupArrayLocal group
            //----------------------------------------------------------------

The file DS_Server.fsx implements the Data Server host.
DS_Server.fsx code:
First, the service is started at the server F# interactive window.

//Server Host
#r "System.ServiceModel"
#r "System.Runtime.Serialization"

//namespace RTE.DataServer

open System
open System.ServiceModel
open System.Collections.Concurrent
open System.Collections.Generic
open System.Runtime.Serialization

#I @"C:\Users\caxelrud\Documents\Visual Studio 2010\Projects\MailboxWCF"
#load "DS_Contract.fsx"

let DS= new DS_Contract.DataServer()


let h=
    //System.ServiceModel
    let baseAddress= new Uri("net.pipe://RTE/dataserver/ds1")
    let s = new ServiceHost(DS,[|baseAddress|])
    //Programmatic Binding
    let binding=
        let b=
            new NetNamedPipeBinding(securityMode=NetNamedPipeSecurityMode.None)
        b 
    s.AddServiceEndpoint(typeof,binding,baseAddress) |>ignore
    s

h.Open()

Now, we can use local methods to register and manipulate data items in the server F# interactive window.


// Test Local 
//Item
let tag1=new DS_Contract.DataItem(Tag="Tag_1",Description="my tag 1",Value=5.0);;
> val tag1 : DS_Contract.DataItem =
Tag=Tag_1, Description=my tag 1 Value=5.000000, Status=0

let r1=DS.RegisterDataLocal(tag1)
printfn "Tag_1 value=%A" (DS.GetDataValueByNameLocal("Tag_1"));;
> Tag_1 value=(5.0, 0)

let r2=DS.SetDataValueByNameLocal "Tag_1" 10.0
printfn "Tag_1 value=%A" (DS.GetDataValueByNameLocal("Tag_1"));;
> Tag_1 value=(10.0, 0)

printfn "tag 1=%f" (DS.dataList.Item(DS.dataDict.Item("Tag_1")).Value) //only local operation
> tag 1=10.000000

//Array
let tag2=new DS_Contract.DataItem(Tag="Tag_2",Description="my tag 2",Value=2.0)
let tag3=new DS_Contract.DataItem(Tag="Tag_3",Description="my tag 3",Value=3.0)
let tag4=new DS_Contract.DataItem(Tag="Tag_4",Description="my tag 4",Value=4.0)
let r3=DS.RegisterDataArrayLocal([|tag2;tag3;tag4|])
printfn "value=%A" (DS.GetDataValueByNameLocal("Tag_2")) 
> value=(2.0, 0)

//only local FSI
DS.dataDict;;
> val it : ConcurrentDictionary =
dict [("Tag_2", 1); ("Tag_4", 3); ("Tag_1", 0); ("Tag_3", 2)]

DS.dataList;;
> val it : List =
seq
[Tag=Tag_1, Description=my tag 1 Value=10.000000, Status=0
{Description = "my tag 1";
Status = 0;
Tag = "Tag_1";
Value = 10.0;};
Tag=Tag_2, Description=my tag 2 Value=2.000000, Status=0
{Description = "my tag 2";
Status = 0;
Tag = "Tag_2";
Value = 2.0;};
Tag=Tag_3, Description=my tag 3 Value=3.000000, Status=0
{Description = "my tag 3";
Status = 0;
Tag = "Tag_3";
Value = 3.0;};
Tag=Tag_4, Description=my tag 4 Value=4.000000, Status=0
{Description = "my tag 4";
Status = 0;
Tag = "Tag_4";
Value = 4.0;}]

//Group
let r4=DS.AddGroupLocal "G1" [|"Tag_1";"Tag_2"|];;
> val r4 : int * int [] = (0, [|0; 0|])

let r5=DS.AddGroupLocal "G2" [|"Tag_1";"Tag_2";"Tag_4"|]
> val r5 : int * int [] = (0, [|0; 0; 0|])

let r6=DS.GetGroupLocal "G1";;
> val r6 : int * DS_Contract.DataItem [] =
(0,
[|Tag=Tag_1, Description=my tag 1 Value=10.000000, Status=0;
Tag=Tag_2, Description=my tag 2 Value=2.000000, Status=0|])

printfn "transaction status=%d, value=%A" (fst r6) (snd r6);;
> transaction status=0, value=[|Tag=Tag_1, Description=my tag 1 Value=10.000000, Status=0;
Tag=Tag_2, Description=my tag 2 Value=2.000000, Status=0|]

let r7,r8,r9=DS.GetGroupArrayLocal "G1"
printfn "transaction status=%d, value=%A, status=%A" r7 r8 r9;;
> transaction status=0, value=[|10.0; 2.0|], status=[|0; 0|]

//only local FSI
DS.groupDict;;
> val it : ConcurrentDictionary =
dict [("G2", [|0; 1; 3|]); ("G1", [|0; 1|])]


//-------------------------------------------------------------------------
Console.WriteLine("Server is running. Press return to exit");;
Console.ReadLine()|>ignore
h.Close() //close it before updates !

DS_Client.fsx code:
The client is started at the client F# interactive window.

//Client
#r "System.ServiceModel"
#r "System.Runtime.Serialization"

open System
open System.ServiceModel
open System.Collections.Concurrent
open System.Collections.Generic
open System.Runtime.Serialization

#I @"C:\Users\caxelrud\Documents\Visual Studio 2010\Projects\MailboxWCF"
#load "DS_Contract.fsx"

let binding=new NetNamedPipeBinding(securityMode=NetNamedPipeSecurityMode.None)
let address=new EndpointAddress("net.pipe://RTE/dataserver/ds1")
let factory= new ChannelFactory(binding,address)
let channel=factory.CreateChannel()

// Remote Test
let tag10=new DS_Contract.DataItem(Tag="Tag_10",Description="my tag 10",Value=5.0);;
> val tag10 : DS_Contract.DataItem =
  Tag=Tag_10, Description=my tag 10 Value=5.000000, Status=0

let r1=channel.RegisterData(tag10)
printfn "Tag_10 value=%A" (channel.GetDataValueByName("Tag_10"));;
> Tag_10 value=(5.0, 0)

let r2=channel.SetDataValueByName "Tag_10" 10.0
printfn "Tag_10 value=%A" (channel.GetDataValueByName("Tag_10"));;
> Tag_10 value=(10.0, 0)

//Array
let tag20=new DS_Contract.DataItem(Tag="Tag_20",Description="my tag 20",Value=2.0)
let tag30=new DS_Contract.DataItem(Tag="Tag_30",Description="my tag 30",Value=3.0)
let tag40=new DS_Contract.DataItem(Tag="Tag_40",Description="my tag 40",Value=4.0)
let r3=channel.RegisterDataArray([|tag20;tag30;tag40|])
printfn "value=%A" (channel.GetDataValueByName("Tag_20"));; 
> value=(2.0, 0)

//Group
let r4=channel.AddGroup "G10" [|"Tag_10";"Tag_20"|];;
let r5=channel.AddGroup "G20" [|"Tag_10";"Tag_20";"Tag_40"|];;
let r6=channel.GetGroup "G10";;
printfn "transaction status=%d, value=%A" (fst r6) (snd r6);;
> transaction status=0, value=[|Tag=Tag_10, Description=my tag 10 Value=10.000000, Status=0;
  Tag=Tag_20, Description=my tag 20 Value=2.000000, Status=0|]

let r7,r8,r9=channel.GetGroupArray "G10"
printfn "transaction status=%d, value=%A, status=%A" r7 r8 r9;;
> transaction status=0, value=[|10.0; 2.0|], status=[|0; 0|]

Wednesday, December 15, 2010

Agent based example using F# MailboxProcessor

Introduction

The following simple example shows how to use F# Mailbox to develop an Agent (or Actor) based application.
Each Agent is a state-machine.
Each Agent do actions based on receiving messages as changing state (state-machine) or replying to the caller.
The action can be done asynchronous, allowing the application to easy scale keeping performance.

You can test and debug the application using F# interactive by send/receive messages to the Agents interactive.

Example

The example uses 3 Agents.

The agent2 and agent3 are of type agentCalc. It receives messages as 'Two' and 'Three' together with an integer. These Agents multiply the receiving number by 2 or by 3 depending on the message.

The agent1 has 2 states: idle and running. It also keeps a totalizer.
It receives messages as: 'Start', 'Exit', 'Reset', 'GoIdle', 'Next', 'Show' and 'State'.
These messages may have different meaning depending of the state of agent1.
For example:
In 'idle':
   - 'Start': change the state to 'running'
   - 'Exit': agent stop execution
In 'running':
    - 'GoIdle': change the state to 'idle'
    - 'Reset': totalizer goes to 0
    - 'Next': generates 2 random integers (0..1) and (0..9).
                The first number is used to decide if the message to be sent will be 'Two' or 'Three'.
                Then it send a message to agent2 and agent3 passing the second number.
                The returning number from agent2 and agent3 is added to the totalizer.

The following script implements the agents:
open System

type msg1 =
    | Start
    | Exit
    | Reset
    | GoIdle
    | Next
    | Show of AsyncReplyChannel
    | State of AsyncReplyChannel


type msg2 =
    Two of int*AsyncReplyChannel | Three of int*AsyncReplyChannel
    


let agentCalc ()=
    MailboxProcessor.Start(fun i ->
        let rec running =
            async {
                let! msg= i.Receive()
                match msg with
                | Two (n,replyChannel) ->
                    do replyChannel.Reply(n*2)
                    return! running
                | Three (n, replyChannel) ->
                    do replyChannel.Reply(n*3)
                    return! running
                }
        running )

let agent2= agentCalc ()
let agent3= agentCalc ()

let mutable total=0
let agent1=
        MailboxProcessor.Start(fun i ->
            let rec idle (total:int)=
                async {
                    let! msg= i.Receive()
                    match msg with
                    | Reset ->
                        let total=0
                        return! idle total
                    | Exit ->
                        return ()
                    | Start ->
                        return! running total
                    | Show (replyChannel)->
                        do replyChannel.Reply(total)
                        return! idle total
                    | State (replyChannel)->
                        do replyChannel.Reply("idle")
                        return! idle total
                    | _->
                        return! idle total                        
                    }
            and running total= 
                async{
                    let! msg= i.Receive()
                    match msg with
                    | Next ->
                        let z=new Random()
                        let n1=z.Next(1)
                        let n2=z.Next(9)
                        let total=
                                if n1=0 then
                                    let r1=agent2.PostAndReply(fun replyChannel->Two(n2,replyChannel))
                                    let r2=agent3.PostAndReply(fun replyChannel->Two(n2,replyChannel))
                                    total+r1+r2
                                else
                                    let r1=agent2.PostAndReply(fun replyChannel->Three(n2,replyChannel))
                                    let r2=agent3.PostAndReply(fun replyChannel->Three(n2,replyChannel))
                                    total+r1+r2
                        return! running total
                    | GoIdle ->
                        return! idle total
                    | Show (replyChannel) ->
                        do replyChannel.Reply(total)
                        return! running total
                    | State (replyChannel)->
                        do replyChannel.Reply("running")
                        return! running total
                    | Reset ->
                        let total=0
                        return! running total
                    | _->
                        return! idle total                        
                }
            idle total)


The following script tests interactivelly the agents:

let r1 = agent1.PostAndReply(fun reply -> State(reply))
printfn "State %s" r1 |>ignore;;
State idle 

let r2 = agent1.PostAndReply(fun reply -> Show(reply))
printfn "Total %d" r2 |>ignore;;
Total 0 

agent1.Post(Start)
let r3 = agent1.PostAndReply(fun reply -> State(reply))
printfn "State %s" r3 |>ignore;;
State running 

for i in 1..5 do
    agent1.Post(Next)
    let r4 = agent1.PostAndReply(fun reply -> Show(reply))
    printfn "Total %d" r4 |>ignore;;

Total 24
Total 48
Total 72
Total 96
Total 120

agent1.Post(Reset)
let r5 = agent1.PostAndReply(fun reply -> Show(reply))
printfn "Total %d" r5 |>ignore;;
Total 0 

agent1.Post(GoIdle)
let r6 = agent1.PostAndReply(fun reply -> State(reply))
printfn "State %s" r6 |>ignore;;
State idle 

agent1.Post(Exit);;