LightMQTT

As I’m finally getting my realtime transit app ready for public release, it’s a good time to go through some things I’ve learned while open sourcing my code for the first time.

Transporter uses MQTT to receive vehicle data updates from the Digitransit APIs. While it’s possible to just repeatedly pull this data, there are just too many vehicles shown on the map simultaneously.

The MQTT feed itself publishes JSON which contains vehicle coordinates, delay compared to schedule, heading, next stop and other useful information. The message payloads are in JSON.

{"VP":{"dir":"1","spd":7.5,"source":"sm5logger","desi":"I","lat":60.20808,"hdg":135,"tst":"2017-06-04T19:21:20.000Z","line":"XXX","start":"2126","long":24.920565,"jrn":"XXX","tsi":1496604080,"dl":60,"veh":"H9171","oper":"XXX","oday":"2017-06-04","stop_index":22}}

In my use case publishing MQTT messages was irrelevant. Easily understandable compact Swift class was what I needed, and I simply could not find one back then1. All available Swift libraries, like CocoaMQTT and Moscapsule, seemed to be just thin wrappers around Mosquitto, bringing along all its dependencies.

I had wanted to code a simple protocol client for quite some time, so soon after the first Swift release, I got a simple MQTT client working pretty quickly. The MQTT specification was an important aid in getting the control packet bytes set up right. I learned about UnsafeMutablePointers and how not to copy data unnecessarily.

While the original LightMQTT implementation worked, it needed more work in the following areas

I came across a Stack Overflow post by @cocoaphony where he suggested reading from InputStream in a simple while loop, dispatched to a background thread. This was a way nicer approach than using the StreamDelegate callbacks for handling bytesAvailable events, and kept stream reading and eventual message parsing away from the main thread.

The current LightMQTT version dispatches reading from a newly opened InputStream to a background queue.

DispatchQueue.global(qos: strongSelf.options.readQosClass).async {
    strongSelf.readStream(input: streams.input, output: streams.output)
}

Reading from the InputStream is a blocking operation, waiting for new bytes to arrive. After a 60 second timeout it either gives up and returns 0 or returns a negative value indicating an error. The while loop, and the whole background execution, only stays alive while the InputStream is open.

while messageParserState == .decodingHeader && input.streamStatus == .open {
    let count = input.read(messageBuffer, maxLength: 1)
    if count == 0 {
        break
    } else if count < 0 {
        return
    }

    if let message = PacketType(rawValue: messageBuffer.pointee & 0xf0) {
        messageType = message
        messageParserState = .decodingLength
        messageLengthMultiplier = 1
        messageLength = 0
    }
}

The code lately received contributions from a fellow GitHub developer who nicely abstracted the details of a control packet away into their own struct. UInt8 arrays also gave room for much nicer AppendableData, which allows adding into bytes in a much swiftier way.

fileprivate struct ControlPacket {
    var type: PacketType
    var flags: PacketFlags
    var variableHeader = AppendableData()
    var payload = AppendableData()

    var remainingLength: Data {
	    ...
    } 

    var encoded: Data {
        var bytes = Data()
        bytes.append(type.rawValue | flags.rawValue)
        bytes += remainingLength
        bytes += variableHeader.data
        bytes += payload.data
        return bytes
    }

    init(type: PacketType, flags: PacketFlags = .mostOnce) {
        self.type = type
        self.flags = flags
    }
}

Control packets can now be built in a neat way like like this

var packet = ControlPacket(type: .connect)
packet.payload += options.concreteClientId

The client also provides a small set of options, e.g. login credentials, TLS usage and a templatable client ID.

var options = LightMQTT.Options()
options.useTLS = true
options.port = 8883
options.username = "myuser"
options.password = "s3cr3t"
options.clientId = "myapp_%%%%"
options.pingInterval = 60
options.bufferSize = 4096
options.readQosClass = .background

let client = LightMQTT(host: "10.10.10.10", options: options)

The client works very reliably in my limited test use and should serve well in the Transporter app.

Check out the full LightMQTT source code at GitHub.

Pasi


  1. More recently also other pure Swift MQTT libraries have sprung up. So far I’ve bumped into SwiftMQTT and Aphid. [return]