All in all, happy with it. Now to work on actually implementing the actual functional parts that execute said config.
Continuing with this project, I've got a little bit of complexity coming up.
I've got to implement rate limiting a bitstream. I'm handling media streams that deliver packets containing media frames. I want to be able to limit the stream to a maximum bitrate. Very simple approach where, if we exceed the bitrate, just start dropping packets until we're under the rate again, and leave repairing the stream to the underlying codec/protocol.
In general, I prefer to lean on the capabilities of the language / library / compiler that I'm using. I think it's a bad idea to try and second guess (at first) the garbage collector or whatever tool you're using. Someone else has invested a lot of effort and engineering smarts into making it work well. It's good practice to lean into that.
Write simple, readable code first, and then if it's not performing to an acceptable level, then start digging in and seeing if you need to handle some special case manually.
In this case though... well, the naive logic of how to implement this kinda makes me worry a bit.
The naive logic would be "keep a linked list of every packet that passes through for the past X seconds, and every time a new packet comes in, add them all up, find the average bitrate, and drop/pass".
I don't know how I feel about possibly generating a continuous stream of garbage, in a tight main loop.
Plus the signature of how this module should work is super simple that, if I'm just sperging here for no justifiable engineering reason (distinct possibility), well, I can just tear it out and drop in a naive implementation.
Ok so, to the meat of the issue. I want to create a RateLimiter module with an Ocaml signature like this:
Code:
module RateLimiter :
sig
type t
val create : max_bitrate:int -> unit -> t (* haven't added some settings, work in progress *)
val record_and_check : t -> int -> bool
end
Very simple. There's a
RateLimiter.create ~max_bitrate ()
function that returns a
RateLimiter.t
, and then a
RateLimiter.record_and_check rl byte_count
function that records the incoming data and returns true or false depending on if the packet should be dropped.
There's a convention in Ocaml to create modules named after nouns, and they usually contain a main type just called
t
, and then a bunch of functions for manipulating the
t
s. You could call it
type rate_limiter
instead if you wanted. But
t
works.
So the algorithm I'm working on right now works by dividing our sample period (which could be a second, half a second, whichever, it'll be configurable in an argument to
RateLimiter.create
) into a number of "buckets". Each bucket records how many bits passed by in that chunk of the sample period.
So in a rough pseudocode, let's say we have:
Code:
buckets = [15, 10, 5, 10, 12, 8, 18, 4, 8, 2]
last_update_time = 100.33 # seconds
total_bitrate = 82
Let's say we are sampling for 1 second, and each of those buckets represents 0.1 second.
Based on a timestamp of 100.33, the current bucket is index 3, which currently has a value of 10 bits (I guess).
While our update time is below 100.4, we'll keep incrementing that 3-indexed bucket. So if current update time is 100.35 and the packet has 2 bits (very small packet!), our state should now be:
Code:
buckets = [15, 10, 5, 12, 12, 8, 18, 4, 8, 2]
last_update_time = 100.35 # seconds
total_bitrate = 84
But when we tick over into 100.4, we have to start recycling buckets. We subtract the value of the recycled bucket from the total_bitrate, zero out that value, and then add our new bits in. That's because that bucket is now longer, time-wise, than our sample period in the past, thus it no longer matters for the bitrate anymore.
So if we tick into 100.4 and we're handling a new packet with 7 bits, our state should look like this:
Code:
buckets = [15, 10, 5, 12, 7, 8, 18, 4, 8, 2]
last_update_time = 100.35 # seconds
total_bitrate = 79 # 84 - 12, then plus 7 bits
The one last unhandled case is if we go longer than the sample period for updates. The solution to that is easy; just zero everything out. Zero bits are zero bits.
I know I repeat myself a lot, but this is the sort of thing I really appreciate about Ocaml, is having the escape valve of side-effecting operations when you need them.
The majority of this program is very functional. High level concepts, mapping and iterating over functional structures. But in the very tightest loops, being able to write side effecting code when necessary is really convenient.
This would be super awkward in Haskell, I feel. (I suppose I shit on Haskell a lot comparing it to Ocaml and SML; one of these days I should do some project in Haskell so I know what I'm talking about. Probably not this one though.)
Oh, and one final thing, I'm using an Ocaml library called
Mtime for the clock stuff. The main advantage over it instead of just using
time(2) directly is that it isn't disturbed by system clock changes. Which, if you've got a daemon running for weeks on end, and some ntpd sync comes through, it won't get disturbed. Minor detail that has the potential to cause more profound problems.
Anyway, I have the logic written, just writing the tests. The meat of the above algorithm is contained in this code:
Code:
let update_buckets t =
let new_span = Mtime_clock.count t.start in
if new_span >= t.cycle_duration
then reset_to_zero t
else
let last_index = span_index t t.last_span in
let new_index = span_index t new_span in
let rec aux ~index =
let index = index % Array.length t.buckets in
t.total <- t.total - t.buckets.(index);
t.buckets.(index) <- 0;
if index >= new_index
then ()
else aux ~index:(index + 1)
in
aux ~index:(last_index + 1);
t.last_span <- new_span
The high level
RateLimiter.record_and_check
function internally calls
update_buckets
, then it just finishes up the bookkeeping and returns true or false.