Computer Science Experimentation

Saturday, May 16, 2015

F# and MQTT - example 1:

This post presents a F# Interactive Script example of the MQTT protocol.
Using the same client node, the example does Publish and Subscribe, of Simple and Complex data, using Json and Binary serialization. The following software were used:
-Server: Mosquitto
-Client: M2Mqtt
-Serialization: Json: Json.Net, Binary: FSPickler

MQTT

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport.
MQTT v3.1.1 has now become an OASIS Standard.
MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimize network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.

Concepts

Publish/Subscribe:

The MQTT protocol is based on the principle of publishing messages and subscribing to topics, or "pub/sub". Multiple clients connect to a broker and subscribe to topics that they are interested in. Clients also connect to the broker and publish messages to topics. Many clients may subscribe to the same topics and do with the information as they please. The broker and MQTT act as a simple, common interface for everything to connect to. This means that you if you have clients that dump subscribed messages to a database, to Twitter or even a simple text file, then it becomes very simple to add new sensors or other data input to a database,Twitter or so on.

Topics/Subscriptions: 

Messages in MQTT are published on topics. There is no need to configure a topic, publishing on it is enough. Topics are treated as a hierarchy, using a slash (/) as a separator. This allows sensible arrangement of common themes to be created, much in the same way as a filesystem. For example, multiple computers may all publish their hard drive temperature information on the following topic, with their own computer and hard drive name being replaced as appropriate:sensors/COMPUTER_NAME/temperature/HARDDRIVE_NAME Clients can receive messages by creating subscriptions. A subscription may be to an explicit topic, in which case only messages to that topic will be received, or it may include wildcards. Two wildcards are available, + or #.
+ can be used as a wildcard for a single level of hierarchy. It could be used with the topic above to get information on all computers and hard drives as follows:sensors/+/temperature/+ # can be used as a wildcard for all remaining levels of hierarchy. This means that it must be the final character in a subscription. With a topic of "a/b/c/d", the following example subscriptions will match: a/b/#, a/b/c/#, +/b/c/# Zero length topic levels are valid.

Clean session / Durable connections:

 On connection, a client sets the "clean session" flag, which is sometimes also known as the "clean start" flag. If clean session is set to false, then the connection is treated as durable. This means that when the client disconnects, any subscriptions it has will remain and any subsequent QoS 1 or 2 messages will be stored until it connects again in the future. If clean session is true, then all subscriptions will be removed for the client when it disconnects. Will: When a client connects to a broker, it may inform the broker that it has a will. This is a message that it wishes the broker to send when the client disconnects unexpectedly. The will message has a topic, QoS and retain status just the same as any other message.

Retained Messages: 

All messages may be set to be retained. This means that the broker will keep the message even after sending it to all current subscribers. If a new subscription is made that matches the topic of the retained message, then the message will be sent to the client. This is useful as a "last known good" mechanism. If a topic is only updated infrequently, then without a retained message, a newly subscribed client may have to wait a long time to receive an update. With a retained message, the client will receive an instant update. Quality of Service: MQTT defines three levels of Quality of Service (QoS). The QoS defines how hard the broker/client will try to ensure that a message is received. Messages may be sent at any QoS level, and clients may attempt to subscribe to topics at any QoS level. This means that the client chooses the maximum QoS it will receive. For example, if a message is published at QoS 2 and a client is subscribed with QoS 0, the message will be delivered to that client with QoS 0. If a second client is also subscribed to the same topic, but with QoS 2, then it will receive the same message but with QoS 2. For a second example, if a client is subscribed with QoS 2 and a message is published on QoS 0, the client will receive it on QoS 0. Higher levels of QoS are more reliable, but involve higher latency and have higher bandwith requirements. 0: The broker/client will deliver the message once, with no confirmation. 1: The broker/client will deliver the message at least once, with confirmation required. 2: The broker/client will deliver the message exactly once by using a four step handshake.

REMARKS

Windows Installation: Mosquitto is installed in "Program Files (x86)" where the files are read-only and requires Admin priviledges. For now, to update the configuration file, make a copy to another folder, do the update and copy it back.

F# Example 1

Script Code:

//M2MqttTest_1.fsx
//Celso Axelrud
//rev.: 5/16/2015-4:15pm

(*
MQTT example 1: Same client node Pub/Sub of simple and complex data using json and binary serialization.
 
Server: Mosquitto
Client: M2Mqtt
Serialization:
    -Json: Json.Net
    -Binary: FSPickler
*)

//Libraries-----------------------
#I @"C:\Project(comp)\Dev_2015\MQTT_1\MQTT_Proj_1\Lib"
#r "M2Mqtt.dll" 
#r "Newtonsoft.Json.dll"
#r "FsPickler.dll"

//Open System---------------------
open System
open System.Text

//Open M2Mqtt---------------------
open uPLibrary.Networking.M2Mqtt
open uPLibrary.Networking.M2Mqtt.Exceptions;
open uPLibrary.Networking.M2Mqtt.Messages;
open uPLibrary.Networking.M2Mqtt.Session;
open uPLibrary.Networking.M2Mqtt.Utility;
open uPLibrary.Networking.M2Mqtt.Internal;

//Create client node--------------
let node = new MqttClient(brokerHostName="localhost")

//Create subscription handles-----
//Received 
let MsgReceived (e:MqttMsgPublishEventArgs) =
    printfn "Sub Received Topic: %s" e.Topic
    printfn "Sub Received Qos: %u" e.QosLevel
    printfn "Sub Received Retain: %b" e.Retain
    printfn "Sub Received Message: %s" (Encoding.ASCII.GetString e.Message)
node.MqttMsgPublishReceived.Add(MsgReceived)

//Publish (requires QoS Level 1 or 2)
let MsgPublish (e:MqttMsgPublishedEventArgs) =
    printfn "Pub Message Published: %b " e.IsPublished
node.MqttMsgPublished.Add(MsgPublish)

//Subscribed (requires QoS Level 1 or 2)
let MsgSubscribed (e:MqttMsgSubscribedEventArgs) =
    printfn "Sub Message Subscribed: %s " (Encoding.ASCII.GetString e.GrantedQoSLevels) 
node.MqttMsgSubscribed.Add(MsgSubscribed)

//Unsubscribed (requires QoS Level 1 or 2) 
let MsgUnsubscribed (e:MqttMsgUnsubscribedEventArgs) =
    printfn "Sub Message Unsubscribed: %i " e.MessageId 
node.MqttMsgUnsubscribed.Add(MsgUnsubscribed)

//Connect-------------------------
node.Connect(clientId="Node1Conn1",username="caxelrud",password="laranja1",
                willRetain=false,willQosLevel=0uy,willFlag=true,willTopic="system/LWT/Node1Conn1",willMessage="offline",
                cleanSession=true,keepAlivePeriod=60us)

//Get node info (Interactive)-----
node;;
node.CleanSession;; node.ClientId;; node.Settings;;
node.WillFlag;; node.WillQosLevel;;
node.WillTopic;; node.WillMessage;;

//Subscribe-----------------------
let  topics1:string[] = [| "sensor/A10/TI100"; "sensor/A10/FI001" |]
let qosLevels1:byte[] = [|MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE; MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE |]
let grantedQos1 = node.Subscribe(topics1, qosLevels1)

//Subscribe to Connection Last Will 
let topics2:string[] = [| "system/LWT/Node1Conn1" |]
let qosLevels2:byte[] = [|MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE |]
let grantedQos2 = node.Subscribe(topics2, qosLevels2)

//Publish-------------------------
let mutable Temp1="100.0"
node.Publish("sensor/A10/TI100", Encoding.UTF8.GetBytes(Temp1))

//Publish-------------------------
Temp1<-"200.0"
node.Publish("sensor/A10/TI100", Encoding.UTF8.GetBytes(Temp1))

//Publish but no notification because we didn't subscribe to this point
node.Publish("sensor/A11/TI101", Encoding.UTF8.GetBytes("1.0"))

//Complex Messages----------------
//json----------------------------
open Newtonsoft.Json

//Type for a group of tags with group name and group date & time
type Tag={Name:string;Value:float;Qual:int}
type Rec={Name:string;Time:System.DateTime;Tags:Tag list}

//Subscribe-----------------------
let  topics3:string[] = [| "sensor/A10/Group1" |]
let qosLevels3:byte[] = [|MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE|]
let grantedQos3 = node.Subscribe(topics3, qosLevels3)

//Generate random values----------
let values1=[for i in 1..2 -> System.Random().NextDouble() ]

//Create the complex point--------
let tags1 = [
        { Name = "PI100"; Value = values1.[0];Qual=0 };
        { Name = "PI101"; Value = values1.[1];Qual=0 }
    ]
let grp1={Name="Group1";Time=System.DateTime.UtcNow;Tags=tags1}

//Serialize-----------------------
let grp1j = JsonConvert.SerializeObject(grp1)
//Deserialize (test)-------------- 
let grp1d = JsonConvert.DeserializeObject<Rec>(grp1j)
//Publish-------------------------
node.Publish("sensor/A10/Group1", message=Encoding.UTF8.GetBytes(grp1j),qosLevel=0uy,retain=true)

//binary serialization---------------
open Nessos.FsPickler

//Subscribe-----------------------
let  topics4:string[] = [| "sensor/A10/Group2" |]
let qosLevels4:byte[] = [|MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE|]
let grantedQos4 = node.Subscribe(topics4, qosLevels4)

//Create the complex point--------
let grp2={Name="Group2";Time=System.DateTime.UtcNow;Tags=tags1}
let binary = FsPickler.CreateBinary()

//Serialize-----------------------
let grp1b=binary.Pickle grp2

//Deserialize (test)-------------- 
let grp1db=binary.UnPickle<Rec> grp1b

//Publish-------------------------
node.Publish("sensor/A10/Group2", message=grp1b,qosLevel=0uy,retain=true)

//Unsubscribe---------------------
node.Unsubscribe(topics1)
node.Unsubscribe(topics2)
node.Unsubscribe(topics3)
node.Unsubscribe(topics4)

//Client Disconnect---------------
node.Disconnect()

Mosquitto Parameters:

#retry_interval 20 #sys_interval 10 #store_clean_interval 10 #pid_file #user mosquitto #max_inflight_messages 20 #max_queued_messages 100 #queue_qos0_messages false #message_size_limit 0 #allow_zero_length_clientid true #auto_id_prefix #persistent_client_expiration #allow_duplicate_messages false #upgrade_outgoing_qos false #bind_address #port 1883 #max_connections -1 #protocol mqtt #http_dir #use_username_as_clientid #cafile #capath #certfile #keyfile #tls_version #require_certificate false #use_identity_as_username false #crlfile #ciphers DEFAULT:!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2:@STRENGTH #psk_hint #ciphers #listener #max_connections -1 #mount_point #protocol mqtt #http_dir #use_username_as_clientid #cafile #capath #certfile #keyfile #require_certificate false #crlfile #ciphers #psk_hint #ciphers #autosave_interval 1800 #autosave_on_changes false #persistence false #persistence_file mosquitto.db #persistence_location #log_dest stderr #log_facility #log_type error #log_type warning #log_type notice #log_type information #websockets_log_level 0 #connection_messages true #log_timestamp true #clientid_prefixes #auth_plugin #password_file #psk_file #acl_file #connection <name> #address <host>[:<port>] [<host>[:<port>]] #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] #bridge_attempt_unsubscribe true #round_robin false #remote_clientid #local_clientid #cleansession false #notifications true #notification_topic #keepalive_interval 60 #start_type automatic #restart_timeout 30 #idle_timeout 60 #threshold 10 #try_private true #remote_username #remote_password #bridge_cafile #bridge_capath #bridge_certfile #bridge_keyfile #bridge_insecure false #bridge_identity #bridge_psk #include_dir #ffdc_output #max_log_entries #trace_level #trace_output *)