LightMQTT
As I’m finally getting my realtime transit app ready for public release, it’s a good time to go through some things I’ve learned while open sourcing my code for the first time.
Transporter uses MQTT to receive vehicle data updates from the Digitransit APIs. While it’s possible to just repeatedly pull this data, there are just too many vehicles shown on the map simultaneously.
The MQTT feed itself publishes JSON which contains vehicle coordinates, delay compared to schedule, heading, next stop and other useful information. The message payloads are in JSON.
{"VP":{"dir":"1","spd":7.5,"source":"sm5logger","desi":"I","lat":60.20808,"hdg":135,"tst":"2017-06-04T19:21:20.000Z","line":"XXX","start":"2126","long":24.920565,"jrn":"XXX","tsi":1496604080,"dl":60,"veh":"H9171","oper":"XXX","oday":"2017-06-04","stop_index":22}}
In my use case publishing MQTT messages was irrelevant. Easily understandable compact Swift class was what I needed, and I simply could not find one back then1. All available Swift libraries, like CocoaMQTT and Moscapsule, seemed to be just thin wrappers around Mosquitto, bringing along all its dependencies.
I had wanted to code a simple protocol client for quite some time, so soon after the first Swift release, I got a simple MQTT client working pretty quickly. The MQTT specification was an important aid in getting the control packet bytes set up right. I learned about UnsafeMutablePointer
s and how not to copy data unnecessarily.
While the original LightMQTT implementation worked, it needed more work in the following areas
- Handling repeated reconnects gracefully (Transporter connects clients in many
viewDidAppear
, only to soon disconnect them inviewDidDisappear
) - Memory handling (for the above reason, clients had to
deinit
reliably) - Correct handling of bad network conditions (transit apps are often used in subways with unreliable 3G, or much worse)
- Parse MQTT messages in a background thread, to keep the app UI running smoothly
I came across a Stack Overflow post by @cocoaphony where he suggested reading from InputStream
in a simple while loop, dispatched to a background thread. This was a way nicer approach than using the StreamDelegate
callbacks for handling bytesAvailable
events, and kept stream reading and eventual message parsing away from the main thread.
The current LightMQTT version dispatches reading from a newly opened InputStream
to a background queue.
DispatchQueue.global(qos: strongSelf.options.readQosClass).async {
strongSelf.readStream(input: streams.input, output: streams.output)
}
Reading from the InputStream
is a blocking operation, waiting for new bytes to arrive. After a 60 second timeout it either gives up and returns 0 or returns a negative value indicating an error. The while loop, and the whole background execution, only stays alive while the InputStream
is open.
while messageParserState == .decodingHeader && input.streamStatus == .open {
let count = input.read(messageBuffer, maxLength: 1)
if count == 0 {
break
} else if count < 0 {
return
}
if let message = PacketType(rawValue: messageBuffer.pointee & 0xf0) {
messageType = message
messageParserState = .decodingLength
messageLengthMultiplier = 1
messageLength = 0
}
}
The code lately received contributions from a fellow GitHub developer who nicely abstracted the details of a control packet away into their own struct. UInt8
arrays also gave room for much nicer AppendableData
, which allows adding into bytes in a much swiftier way.
fileprivate struct ControlPacket {
var type: PacketType
var flags: PacketFlags
var variableHeader = AppendableData()
var payload = AppendableData()
var remainingLength: Data {
...
}
var encoded: Data {
var bytes = Data()
bytes.append(type.rawValue | flags.rawValue)
bytes += remainingLength
bytes += variableHeader.data
bytes += payload.data
return bytes
}
init(type: PacketType, flags: PacketFlags = .mostOnce) {
self.type = type
self.flags = flags
}
}
Control packets can now be built in a neat way like like this
var packet = ControlPacket(type: .connect)
packet.payload += options.concreteClientId
The client also provides a small set of options, e.g. login credentials, TLS usage and a templatable client ID.
var options = LightMQTT.Options()
options.useTLS = true
options.port = 8883
options.username = "myuser"
options.password = "s3cr3t"
options.clientId = "myapp_%%%%"
options.pingInterval = 60
options.bufferSize = 4096
options.readQosClass = .background
let client = LightMQTT(host: "10.10.10.10", options: options)
The client works very reliably in my limited test use and should serve well in the Transporter app.
Check out the full LightMQTT source code at GitHub.
Pasi