You can follow the progress of this project on GitLab. This post covers the first few commits.

If you want to follow these posts, you’ll need following things:

  • A working TVHeadend installation, if you want to exactly copy this;
  • Rust installed on your development machine
  • Some knowledge of the Rust programming language; you might want to have read the (online) Rust book;
  • Some imagination; I’m writing this after the facts.

I have recently been interested in teaching myself Rust, which is a programming language focussed on performance and safety.

I am writing this article mostly as a reference to tokio for myself and for others; it’s assumed that you have some Rust knowledge. Most tokio documentation implements a server, as Rust is especially useful for servers, because of its performance and safety features. What I wanted to accomplish was to write a client for tvheadend, which is a free TV DVR system for GNU/Linux and other operating systems.

My main goal with this library is to provide an interface to TVHeadend’s API through its “proprietary” htsp protocol, so I can write some high level applications that interface with my TVHeadend systems.

The protocol is documented on a high level here, and the messages encoding scheme can also be found. I call the protocol “proprietary” because for one, only TVHeadend and Kodi implement it, as respectively server and client, and second, because its implementation is the best source of documentation.

So, enough talk, let’s get our hands dirty, shall we?

Creating a new project

I’ll be using cargo, Rust’s package manager and project manager, throughout the whole project. This project consists of a library, so we’ll create a shiny new project first:

cargo new htsp
cd htsp

Now, the first thing I did, was to add some dependencies. You’ll probably do that on a as-required basis yourself, but in any case, it is useful to have a look at your project configuration file Cargo.toml.

I added tokio_core and byteorder as dependencies; the first one is useful for TCP and socket related stuff, the latter one deals with big endian and little endian conversion.

[dependencies]
tokio-core = "0.1"
byteorder = "1"
log = "0.3"

[dev-dependencies]
matches = "0.1.4"

I also added log, which provides some macro’s that’ll print messages, but doesn’t interfere with the main executable if they don’t want to. The dev-dependency matches provides the macro matches!, which is useful when implementing tests.

I immediately import both crates into my project by altering the lib.rs file:

extern crate tokio_core;
extern crate byteorder;

that way, we can use them throughout the whole project!

Implementing the codec

I started off by implementing the codec, which is the part of the protocol that encodes and decodes the messages. I mostly built the serialization to the specs, which I then only could really test Therefore, I started a new file, message.rs, in which I’ll implement the serialization of messages. In lib.rs, you should declare the new module:

mod message;

By reading the htsmsg spec, it seems like a binary protocol. As it turns out, Rust is pretty neat in handling these thing.

Anyhow, there are five kind of field types in a message, and a message is basically a map. I added these types in a comment in the messages.rs file, for my own reference during implementation:

// ||Name||ID||Description
// ||Map ||1 ||Sub message of type map
// ||S64 ||2 ||Signed 64bit integer
// ||Str ||3 ||UTF-8 encoded string
// ||Bin ||4 ||Binary blob
// ||List||5 ||Sub message of type list

The most Rust-y way to implement the notion of “a field is either of these”, is probably an enum:

struct MapMessage;
struct ListMessage;
pub enum HtsMsgField {
    Map(MapMessage),
    S64(i64),
    Str(String),
    Bin(Vec<u8>),
    List(ListMessage),
}

There is a bunch of stuff happening here; we’re specifying that several HtsMsgField types contain certain extra information. The Map type contains a MapMessage, which in turn will consist of different (name, HtsMsgFields) pairs, the List type is the same idea, without the key, and S64, Str, and Bin can all be implemented in terms of already existing types.

We’ll create a field fields in the MapMessage and the ListMessage, in which the Map and List contents are stored.

pub struct MapMessage {
    fields: HashMap<String, HtsMsgField>,
}
pub struct ListMessage {
    fields: Vec<HtsMsgField>
}

As the MapMessage and ListMessage both contain the same kind of data, we’re able to abstract over the contents a bit when decoding.

I’ll restrict this article to the implementation of MapMessage, everything else can be done in an analoge way, and would be too boring to list up here.

use tokio_core::io::EasyBuf;

impl MapMessage {
    pub fn decode(buf: &mut EasyBuf) -> io::Result<Self> {
        let extractor = FieldExtractor::extract(buf);
        let items: io::Result<Vec<_>> = extractor.collect();
        let fields = HashMap::from_iter(try!(items));

        Ok(MapMessage{
            fields: fields,
        })
    }
    pub fn encode(&self, buf: &mut Vec<u8>) -> io::Result<()> {
        for (name, value) in &self.fields {
            try!(value.encode(name, buf));
        }
        trace!("Encoded map msg");
        Ok(())
    }
}

So, that decoder constructs a HashMap from binary data, using FieldExtractor. We can also use the FieldExtractor in the ListMessage, by discarding the field name after extraction. Or even better; as the spec requires the field name to be empty, we can return an error while mapping over the items iterator.

For the details, you can check out my message.rs. The map an sich is not so difficult to construct, but you’ll need to keep your attention. I use a lot of Result::from_iter to convert between Vec<Result<_,_>> and Result<Vec<_>,_>, which really comes in handy here.

So, on to the implementation of FieldExtractor. Notice that we use the length of the data in the EasyBuf, which we will delimit before even feeding it into MapMessage.decode.

struct FieldExtractor<'a> {
    buf: &'a mut EasyBuf,
}

impl<'a> FieldExtractor<'a> {
    fn extract(buf: &'a mut EasyBuf) -> FieldExtractor<'a> {
        FieldExtractor {
            buf: buf,
        }
    }
}
impl<'a> Iterator for FieldExtractor<'a> {
    type Item = io::Result<(String, HtsMsgField)>;

    fn next(&mut self) -> Option<Self::Item> {
        // As long as we have data in the intermediate buffer
        if self.buf.len() == 0 {
            return None;
        }
        trace!("Decoding message with {} bytes", self.buf.len());
        match HtsMsgField::decode(self.buf) {
            Ok((name, val)) => Some(Ok((name, val))),
            Err(error) => Some(Err(error)),
        }
    }
}

We conveniently implement Iterator, so that we can just use regular .map, .collect, and stuff like that.

So, now a Map object can be decoded, we still need to implement the decoding of a whole message. tokio has a neat Trait for that, which is the tokio_core::io::Codec Trait. When implemented, it stiches together with other Traits to more or less automagically provide you with your protocol on top of TCP.

You implement the Codec Trait on a pseudo struct, like this:

impl Codec for HtspCodec {
    type In = (RequestId, MapMessage);
    type Out = (RequestId, MapMessage);

    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>> {
        // This method returns Ok(None) when no data could be decoded,
        // but no error occured. For example when not enough data was received yet.
        //
        // Otherwise, you return Ok(Some(msg)), where msg is your decoded msg.
        // You will also have to consume the right amount of bytes from the EasyBuf,
        // when yielding a message.
        // Otherwise, you leave the buffer untouched.
    }

    fn encode(&mut self, (id, mut msg): Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
        // Here you take the supplied (id, msg) and serialize them into the buffer.
        // Return Ok(()) when there's no error. Pretty straight forward.
    }
}

I left out the implementation, because it’s quite long. Check it out on the repo. What I do is I read the first four bytes in big endian into len; they contain the message length. If then appears that the socket has the right amount of bytes available, I consume the four length bytes, and then I drain len bytes into an intermediate buffer. That one, I decode using MapMessage.

tokio makes sure that this method is called again when the buffer is not yet empty; no need to loop these things yourself.

Now a word about these:

    type In = (RequestId, MapMessage);
    type Out = (RequestId, MapMessage);

tokio differentiates between multiplexed protocols, and pipelined protocols. In the latter, an answer follows directly on a request, in order. In the former, an answer contains an ID, which the request generated; that way multiple messages can be sent after eachother, without having to wait on a response.

The nice thing is that tokio takes care about these things, so that you’ll receive the right answer on your request.

In htsp, there’s a notion of a seq (sequence number) attribute, so I modify the message, add the seq parameter, and serialize that.

If you don’t need multiplexing, you can just use

impl Codec for HtspCodec {
    type In = MapMessage;
    type Out = MapMessage;

    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>> {
        //
    }

    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
        //
    }
}

On this point, you’ll also want a bunch of unit tests for your decoding and encoding system. Suggestions: write out some binary encodings by head, put them into a vec![], and try to decode them. Secondary, you try to re-encode them, and you check whether your new encoding matches the first one. You can also do it the other way around: encode something, and try to decode it again.

Want to know how? Have a look here. An example:

#[cfg(test)]
#[test]
fn simple_decode() {
    let mut state = HtspCodec;

    // The shortest message possible.
    let mut b = EasyBuf::from(vec![0u8, 0u8, 0u8, 0u8]);
    assert!(state.decode(&mut b).is_ok());

    // The second shortest message possible:
    // A message { "a" => 7 }
    let mut b = EasyBuf::from(vec![0u8, 0u8, 0u8, 8u8,
                                   2u8, 1u8, 0u8, 0u8, 0u8, 1u8,
                                  97u8, 7u8]);
    let (id, msg) = state.decode(&mut b).unwrap().unwrap();
    assert_eq!(id, 0);
    assert!(matches!(msg.get("a".into()), Some(&HtsMsgField::S64(7))));
}

This one tries to decode the smallest possible message (an empty message), and a message {“a” => 7}. If things match up, the test succeeds. Remember to use cargo test to test your code, and to make a .gitlab-ci.yml file to have continuous integration!

I think that’s it for this post. Stay tuned!