logo

drewdevault.com

[mirror] blog and personal website of Drew DeVault git clone https://hacktivis.me/git/mirror/drewdevault.com.git

io_uring-finger-server.md (32487B)


  1. ---
  2. title: Using io_uring to make a high-performance... finger server
  3. date: 2021-05-24
  4. outputs: [html, gemtext]
  5. ---
  6. I'm working on adding a wrapper for the [Linux io_uring interface][0] to my
  7. [secret programming language project][1]. To help learn more about io_uring and
  8. to test out the interface I was designing, I needed a small project whose design
  9. was well-suited for the value-add of io_uring. The [Finger protocol][2] is
  10. perfect for this! After being designed in the 70's and then completely forgotten
  11. about for 50 years, it's the perfect small and simple network protocol to test
  12. drive this new interface with.
  13. [0]: https://unixism.net/loti/what_is_io_uring.html
  14. [1]: https://drewdevault.com/2021/03/19/A-new-systems-language.html
  15. [2]: https://en.wikipedia.org/wiki/Finger_protocol
  16. In short, finger will reach out to a remote server and ask for information about
  17. a user. It was used back in the day to find contact details like the user's
  18. phone number, office address, email address, sometimes their favorite piece of
  19. ASCII art, and, later, a summary of the things they were working on at the
  20. moment. The somewhat provocative name allegedly comes from an older usage of
  21. the word to mean "a snitch" or a member of the FBI. The last useful RFC related
  22. to Finger is [RFC 1288][3], circa 1999, which will be our reference for this
  23. server. If you want to give it a test drive, try this to ping the server we'll
  24. be discussing today:
  25. ```
  26. printf 'drew\r\n' | nc drewdevault.com 79
  27. ```
  28. You might also have the finger command installed locally (try running "finger
  29. drew\@drewdevault.com"), and you can try out the [Castor][4] browser by sourcehut
  30. user ~julienxx for a graphical experience.
  31. [3]: https://datatracker.ietf.org/doc/html/rfc1288
  32. [4]: https://sr.ht/~julienxx/Castor/
  33. And what is io_uring? It is the latest interface for async I/O on Linux, and
  34. it's pretty innovative and interesting. The basic idea is to set up some memory
  35. which is shared between the kernel and the userspace program, and stash a couple
  36. of ring buffers there that can be updated with atomic writes. Userspace appends
  37. submission queue entries (SQEs) to the submission queue (SQ), and the kernel
  38. processes the I/O requests they describe and then appends completion queue
  39. events (CQEs) to the completion queue (CQ). Interestingly, both sides can see
  40. this happening *without* entering the kernel with a syscall, which is a major
  41. performance boost. It more or less solves the async I/O problem for Linux, which
  42. Linux (and Unix at large) has struggled to do for a long time.
  43. With that the background in place, I'm going to walk you through my finger
  44. server's code. Given that this is written in an as-of-yet unreleased programming
  45. language, I'll do my best to help you decipher the alien code.
  46. <details>
  47. <summary>A quick disclaimer</summary>
  48. <p>
  49. This language, the standard library, and the interface provided by
  50. linux::io_uring, are all works in progress and are subject to change. In
  51. particular, this program will become obsolete when we design a portable I/O
  52. bus interface, which on Linux will be backed by io_uring but on other systems
  53. will use kqueue, poll, etc.
  54. <p>
  55. As a rule of thumb, anything which uses rt:: or linux:: is likely to change or
  56. be moved behind a portable abstraction in the future.
  57. </details>
  58. Let's start with the basics:
  59. <!--
  60. Yes, it's called Hare. Now keep that to yourself.
  61. -->
  62. ```hare
  63. use fmt;
  64. use getopt;
  65. use net::ip;
  66. use strconv;
  67. use unix::passwd;
  68. def MAX_CLIENTS: uint = 128;
  69. export fn main() void = {
  70. let addr: ip::addr = ip::ANY_V6;
  71. let port = 79u16;
  72. let group = "finger";
  73. const cmd = getopt::parse(os::args,
  74. "finger server",
  75. ('B', "addr", "address to bind to (default: all)"),
  76. ('P', "port", "port to bind to (default: 79)"),
  77. ('g', "group", "user group enabled for finger access (default: finger)"));
  78. defer getopt::finish(&cmd);
  79. for (let i = 0z; i < len(cmd.opts); i += 1) {
  80. const opt = cmd.opts[i];
  81. switch (opt.0) {
  82. 'B' => match (ip::parse(opt.1)) {
  83. a: ip::addr => addr = a,
  84. ip::invalid => fmt::fatal("Invalid IP address"),
  85. },
  86. 'P' => match (strconv::stou16(opt.1)) {
  87. u: u16 => port = u,
  88. strconv::invalid => fmt::fatal("Invalid port"),
  89. strconv::overflow => fmt::fatal("Port exceeds range"),
  90. },
  91. 'g' => group = opt.1,
  92. };
  93. };
  94. const grent = match (passwd::getgroup(group)) {
  95. void => fmt::fatal("No '{}' group available", group),
  96. gr: passwd::grent => gr,
  97. };
  98. defer passwd::grent_finish(grent);
  99. };
  100. ```
  101. None of this code is related to io_uring or finger, but just handling some
  102. initialization work. This is the daemon program, and it will accept some basic
  103. configuration via the command line. The getopt configuration shown here will
  104. produce the following help string:
  105. ```
  106. $ fingerd -h
  107. fingerd: finger server
  108. Usage: ./fingerd [-B <addr>] [-P <port>] [-g <group>]
  109. -B <addr>: address to bind to (default: all)
  110. -P <port>: port to bind to (default: 79)
  111. -g <group>: user group enabled for finger access (default: finger)
  112. ```
  113. The basic idea is to make finger access opt-in for a given Unix account by
  114. adding them to the "finger" group. The "passwd::getgroup" lookup fetches that
  115. entry from /etc/group to identify the list of users for whom we should be
  116. serving finger access.
  117. ```hare
  118. let serv = match (net::listen(addr, port,
  119. 256: net::backlog, net::reuseport)) {
  120. err: io::error => fmt::fatal("listen: {}", io::strerror(err)),
  121. l: *net::listener => l,
  122. };
  123. defer net::shutdown(serv);
  124. fmt::printfln("Server running on :{}", port)!;
  125. ```
  126. Following this, we set up a TCP listener. I went for a backlog of 256
  127. connections (overkill for a finger server, but hey), and set reuseport so you
  128. can achieve CLOUD SCALE by running several daemons at once.
  129. Next, I set up the io_uring that we'll be using:
  130. ```hare
  131. // The ring size is 2 for the accept and sigfd read, plus 2 SQEs for
  132. // each of up to MAX_CLIENTS: either read/write plus a timeout, or up to
  133. // two close SQEs during cleanup.
  134. static assert(MAX_CLIENTS * 2 + 2 <= io_uring::MAX_ENTRIES);
  135. let params = io_uring::params { ... };
  136. let ring = match (io_uring::setup(MAX_CLIENTS * 2 + 2, &params)) {
  137. ring: io_uring::io_uring => ring,
  138. err: io_uring::error => fmt::fatal(io_uring::strerror(err)),
  139. };
  140. defer io_uring::finish(&ring);
  141. ```
  142. If we were running this as root (and we often are, given that fingerd binds to
  143. port 79 by default), we could go syscall-free by adding
  144. `io_uring::setup_flags::SQPOLL` to `params.flags`, but this requires more
  145. testing on my part so I have not added it yet. With this configuration, we'll
  146. need to use the `io_uring_enter` syscall to submit I/O requests.
  147. We also have to pick a queue size when setting up the uring. I planned this out
  148. so that we can have two SQEs in flight for every client at once &mdash; one for
  149. a read/write request and its corresponding timeout, or for the two "close"
  150. requests used when disconnecting the client &mdash; plus two extra entries, one
  151. for the "accept" call, and another to wait for signals from a signalfd.
  152. Speaking of signalfds:
  153. ```hare
  154. let mask = rt::sigset { ... };
  155. rt::sigaddset(&mask, rt::SIGINT)!;
  156. rt::sigaddset(&mask, rt::SIGTERM)!;
  157. rt::sigprocmask(rt::SIG_BLOCK, &mask, null)!;
  158. let sigfd = signalfd::signalfd(-1, &mask, 0)!;
  159. defer rt::close(sigfd)!;
  160. const files = [net::listenerfd(serv) as int, sigfd];
  161. io_uring::register_files(&ring, files)!;
  162. const sqe = io_uring::must_get_sqe(&ring);
  163. io_uring::poll_add(sqe, 1, rt::POLLIN: uint, flags::FIXED_FILE);
  164. io_uring::set_user(sqe, &sigfd);
  165. ```
  166. We haven't implemented a high-level signal interface yet, so this is just using
  167. the syscall wrappers. I chose to use a signalfd here so I can monitor for SIGINT
  168. and SIGTERM with my primary I/O event loop, to (semi-)gracefully[^1] terminate
  169. the server.
  170. [^1]: Right now the implementation drops all in-flight requests during shutdown. If we wanted to be even more graceful, it would be pretty easy to stop accepting new connections and do a soft shutdown while we finish servicing any active clients. net::reuseport would allow us to provide zero downtime during reboots with this approach, since another daemon could continue servicing users while this one is shutting down.
  171. This also happens to show off our first SQE submission. "must_get_sqe" will
  172. fetch the next SQE, asserting that there is one available, which relies on the
  173. math I explained earlier when planning for our queue size. Then, we populate
  174. this SQE with a "poll_add" operation, which polls on the first fixed file
  175. descriptor. The "register" call above adds the socket and signal file
  176. descriptors to the io_uring's list of "fixed" file descriptors, and so with
  177. "flags::FIXED_FILE" this refers to the signalfd.
  178. We also set the user_data field of the SQE with "set_user". This will be copied
  179. to the CQE later, and it's necessary that we provide a unique value in order to
  180. correlate the CQE back to the SQE it refers to. We can use any value, and the
  181. address of the signalfd variable is a convenient number we can use for this
  182. purpose.
  183. There's one more step &mdash; submitting the SQE &mdash; but that'll wait until
  184. we set up more I/O. Next, I have set up a "context" structure which will store
  185. all of the state the server needs to work with, to be passed to functions
  186. throughout the program.
  187. ```hare
  188. type context = struct {
  189. users: []str,
  190. clients: []*client,
  191. uring: *io_uring::io_uring,
  192. };
  193. // ...
  194. const ctx = context {
  195. users = grent.userlist,
  196. uring = &ring,
  197. ...
  198. };
  199. ```
  200. The second "\..." towards the end is not for illustrative purposes - it sets all
  201. of the remaining fields to their default values (in this case, clients becomes
  202. an empty slice).
  203. Finally, this brings us to the main loop:
  204. ```hare
  205. let accept_waiting = false;
  206. for (true) {
  207. const peeraddr = rt::sockaddr { ... };
  208. const peeraddr_sz = size(rt::sockaddr): uint;
  209. if (!accept_waiting && len(ctx.clients) < MAX_CLIENTS) {
  210. const sqe = io_uring::must_get_sqe(ctx.uring);
  211. io_uring::accept(sqe, 0, &peeraddr, &peeraddr_sz,
  212. 0, flags::FIXED_FILE);
  213. io_uring::set_user(sqe, &peeraddr);
  214. accept_waiting = true;
  215. };
  216. io_uring::submit(&ring)!;
  217. let cqe = match (io_uring::wait(ctx.uring)) {
  218. err: io_uring::error => fmt::fatal("Error: {}",
  219. io_uring::strerror(err)),
  220. cqe: *io_uring::cqe => cqe,
  221. };
  222. defer io_uring::cqe_seen(&ring, cqe);
  223. const user = io_uring::get_user(cqe);
  224. if (user == &peeraddr) {
  225. accept(&ctx, cqe, &peeraddr);
  226. accept_waiting = false;
  227. } else if (user == &sigfd) {
  228. let si = signalfd::siginfo { ... };
  229. rt::read(sigfd, &si, size(signalfd::siginfo))!;
  230. fmt::errorln("Caught signal, terminating")!;
  231. break;
  232. } else for (let i = 0z; i < len(ctx.clients); i += 1) {
  233. let client = ctx.clients[i];
  234. if (user == client) {
  235. dispatch(&ctx, client, cqe);
  236. break;
  237. };
  238. };
  239. };
  240. ```
  241. At each iteration, assuming we have room and aren't already waiting on a new
  242. connection, we submit an "accept" SQE to fetch the next incoming client. This
  243. SQE accepts an additional parameter to write the client's IP address to, which
  244. we provide via a pointer to our local peeraddr variable.
  245. We call "submit" at the heart of the loop to submit any SQEs we have pending
  246. (including both the signalfd poll and the accept call, but also anything our
  247. future client handling code will submit) to the io_uring, then wait the next CQE
  248. from the kernel.
  249. When we get one, we defer a "cqe_seen", which will execute at the end of the
  250. current scope (i.e. the end of this loop iteration) to advance our end of the
  251. completion queue, then figure out what I/O request was completed. The code
  252. earlier sets up SQEs for the accept and signalfd, which we check here. If a
  253. signal comes in, we read the details to acknowledge it and then terminate the
  254. loop. We also check if the user data was set to the address of any client state
  255. data, which we'll use to dispatch for client-specific I/O later on. If a new
  256. connection comes in:
  257. ```hare
  258. fn accept(ctx: *context, cqe: *io_uring::cqe, peeraddr: *rt::sockaddr) void = {
  259. const fd = match (io_uring::result(cqe)) {
  260. err: io_uring::error => fmt::fatal("Error: accept: {}",
  261. io_uring::strerror(err)),
  262. fd: int => fd,
  263. };
  264. const peer = net::ip::from_native(*peeraddr);
  265. const now = time::now(time::clock::MONOTONIC);
  266. const client = alloc(client {
  267. state = state::READ_QUERY,
  268. deadline = time::add(now, 10 * time::SECOND),
  269. addr = peer.0,
  270. fd = fd,
  271. plan_fd = -1,
  272. ...
  273. });
  274. append(ctx.clients, client);
  275. submit_read(ctx, client, client.fd, 0);
  276. };
  277. ```
  278. This is fairly self-explanatory, but we do see the first example of how to
  279. determine the result from a CQE. The result field of the CQE structure the
  280. kernel fills in is set to what would normally be the return value of the
  281. equivalent syscall, and "linux::io_uring::result" is a convenience function
  282. which translates negative values (i.e. errno) into a more idiomatic result type.
  283. We choose a deadline here, 10 seconds from when the connection is established,
  284. for the entire exchange to be completed by. This helps to mitigate
  285. [Slowloris][loris] attacks, though there are more mitigations we could implement
  286. for this.
  287. [loris]: https://en.wikipedia.org/wiki/Slowloris_(computer_security)
  288. Our client state is handled by a state machine, which starts in the
  289. "READ_QUERY" state. Per the RFC, the client will be sending us a query, followed
  290. by a CRLF. Our initial state is prepared to handle this. The full client state
  291. structure is as follows:
  292. ```hare
  293. type state = enum {
  294. READ_QUERY,
  295. OPEN_PLAN,
  296. READ_PLAN,
  297. WRITE_RESP,
  298. WRITE_ERROR,
  299. };
  300. type client = struct {
  301. state: state,
  302. deadline: time::instant,
  303. addr: ip::addr,
  304. fd: int,
  305. plan_fd: int,
  306. plan_path: *const char,
  307. xbuf: [2048]u8,
  308. buf: []u8,
  309. };
  310. ```
  311. Each field will be explained in due time. We add this to our list of active
  312. connections and call "submit_read".
  313. ```hare
  314. fn submit_read(ctx: *context, client: *client, fd: int, offs: size) void = {
  315. const sqe = io_uring::must_get_sqe(ctx.uring);
  316. const maxread = len(client.xbuf) / 2;
  317. io_uring::read(sqe, fd, client.xbuf[len(client.buf)..]: *[*]u8,
  318. maxread - len(client.buf), offs: u64, flags::IO_LINK);
  319. io_uring::set_user(sqe, client);
  320. let ts = rt::timespec { ... };
  321. time::instant_to_timespec(client.deadline, &ts);
  322. const sqe = io_uring::must_get_sqe(ctx.uring);
  323. io_uring::link_timeout(sqe, &ts, timeout_flags::ABS);
  324. };
  325. ```
  326. I've prepared two SQEs here. The first is a read, which will fill half of the
  327. client buffer with whatever they send us over the network (why half? I'll
  328. explain later). It's configured with "flags::IO_LINK", which will link it to the
  329. following request: a timeout. This will cause the I/O to be cancelled if it
  330. doesn't complete before the deadline we set earlier. "timeout_flags::ABS"
  331. specifies that the timeout is an absolute timestamp rather than a duration
  332. computed from the time of I/O submission.
  333. I set the user data to the client state pointer, which will be used the next
  334. time we have a go-around in the main event loop (feel free to scroll back up if
  335. you want to re-read that bit). The event loop will send the CQE to the dispatch
  336. function, which will choose the appropriate action based on the current client
  337. state.
  338. ```hare
  339. fn dispatch(ctx: *context, client: *client, cqe: *io_uring::cqe) void = {
  340. match (switch (client.state) {
  341. state::READ_QUERY => client_query(ctx, client, cqe),
  342. state::OPEN_PLAN => client_open_plan(ctx, client, cqe),
  343. state::READ_PLAN => client_read_plan(ctx, client, cqe),
  344. state::WRITE_RESP, state::WRITE_ERROR =>
  345. client_write_resp(ctx, client, cqe),
  346. }) {
  347. err: error => disconnect_err(ctx, client, err),
  348. void => void,
  349. };
  350. };
  351. ```
  352. *What's the difference between match and switch? The former works with types,
  353. and switch works with values. We might attempt to merge these before the
  354. language's release, but for now the distinction simplifies our design.*
  355. I've structured the client state machine into four states based on the kind of
  356. I/O they handle, plus a special case for error handling:
  357. 1. Reading the query from the client
  358. 2. Opening the plan file for the requested user
  359. 3. Reading from the plan file
  360. 4. Forwarding its contents to the client
  361. ![](https://l.sr.ht/p5yc.svg)
  362. Each circle in this diagram represents a point where we will submit some I/O to
  363. our io_uring instance and return to the event loop. If any I/O resulted in an
  364. error, we'll follow the dotted line to the error path, which transmits the error
  365. to the user (and if an error occurs *during* error transmission, we'll
  366. immediately disconnect them, but that's not shown here).
  367. I need to give a simplified introduction to error handling in this new
  368. programming language before we move on, so let's take a brief detour. In this
  369. language, we require the user to explicitly do *something* about errors.
  370. Generally speaking, there are three somethings that you will do:
  371. - Some context-appropriate response to an error condition
  372. - Bumping the error up to the caller to deal with
  373. - Asserting that the error will never happen in practice
  374. The latter two options have special operators ("?" and "!", respectively, used
  375. as postfix operators on expressions which can fail), and the first option is
  376. handled manually in each situation as appropriate. It's usually most convenient
  377. to use ? to pass errors up the stack, but the buck has got to stop somewhere. In
  378. the code we've seen so far, we're in or near the main function &mdash; the top
  379. of the call stack &mdash; and so have to handle these errors manually, usually
  380. by terminating the program with "!". But, when a client causes an error, we
  381. cannot terminate the program without creating a DoS vulnerability. This
  382. "dispatch" function sets up common client error handling accordingly, allowing
  383. later functions to use the "?" operator to pass errors up to it.
  384. To represent the errors themselves, we use a lightweight approach to tagged
  385. unions, similar to a result type. Each error type, optionally with some extra
  386. metadata, is enumerated, along with any possible successful types, as part of a
  387. function's return type. The only difference between an error type and a normal
  388. type is that the former is denoted with a "!" modifier &mdash; so you can store
  389. any representable state in an error type.
  390. I also wrote an "errors" file which provides uniform error handling for all of
  391. the various error conditions we can expect to occur in this program. This
  392. includes all of the error conditions that we define ourselves, as well as any
  393. errors we expect to encounter from modules we depend on. The result looks like
  394. this:
  395. ```hare
  396. use fs;
  397. use io;
  398. use linux::io_uring;
  399. type unexpected_eof = !void;
  400. type invalid_query = !void;
  401. type no_such_user = !void;
  402. type relay_denied = !void;
  403. type max_query = !void;
  404. type error = !(
  405. io::error |
  406. fs::error |
  407. io_uring::error |
  408. unexpected_eof |
  409. invalid_query |
  410. no_such_user |
  411. relay_denied |
  412. max_query
  413. );
  414. fn strerror(err: error) const str = {
  415. match (err) {
  416. err: io::error => io::strerror(err),
  417. err: fs::error => fs::strerror(err),
  418. err: io_uring::error => io_uring::strerror(err),
  419. unexpected_eof => "Unexpected EOF",
  420. invalid_query => "Invalid query",
  421. no_such_user => "No such user",
  422. relay_denied => "Relay access denied",
  423. max_query => "Maximum query length exceeded",
  424. };
  425. };
  426. ```
  427. With an understanding of error handling, we can re-read the dispatch function's
  428. common error handling for all client issues:
  429. ```hare
  430. fn dispatch(ctx: *context, client: *client, cqe: *io_uring::cqe) void = {
  431. match (switch (client.state) {
  432. state::READ_QUERY => client_query(ctx, client, cqe),
  433. state::OPEN_PLAN => client_open_plan(ctx, client, cqe),
  434. state::READ_PLAN => client_read_plan(ctx, client, cqe),
  435. state::WRITE_RESP, state::WRITE_ERROR =>
  436. client_write_resp(ctx, client, cqe),
  437. }) {
  438. err: error => disconnect_err(ctx, client, err),
  439. void => void,
  440. };
  441. };
  442. ```
  443. Each dispatched-to function returns a tagged union of (void | error), the latter
  444. being our common error type. If they return void, we do nothing, but if an error
  445. occurred, we call "disconnect_err".
  446. ```hare
  447. fn disconnect_err(ctx: *context, client: *client, err: error) void = {
  448. fmt::errorfln("{}: Disconnecting with error: {}",
  449. ip::string(client.addr), strerror(err))!;
  450. const forward = match (err) {
  451. (unexpected_eof | invalid_query | no_such_user
  452. | relay_denied | max_query) => true,
  453. * => false,
  454. };
  455. if (!forward) {
  456. disconnect(ctx, client);
  457. return;
  458. };
  459. client.buf = client.xbuf[..];
  460. const s = fmt::bsprintf(client.buf, "Error: {}\r\n", strerror(err));
  461. client.buf = client.buf[..len(s)];
  462. client.state = state::WRITE_ERROR;
  463. submit_write(ctx, client, client.fd);
  464. };
  465. fn disconnect(ctx: *context, client: *client) void = {
  466. const sqe = io_uring::must_get_sqe(ctx.uring);
  467. io_uring::close(sqe, client.fd);
  468. if (client.plan_fd != -1) {
  469. const sqe = io_uring::must_get_sqe(ctx.uring);
  470. io_uring::close(sqe, client.plan_fd);
  471. };
  472. let i = 0z;
  473. for (i < len(ctx.clients); i += 1) {
  474. if (ctx.clients[i] == client) {
  475. break;
  476. };
  477. };
  478. delete(ctx.clients[i]);
  479. free(client);
  480. };
  481. ```
  482. We log the error here, and for certain kinds of errors, we "forward" them to the
  483. client by writing them to our client buffer and going into the "WRITE_RESP"
  484. state. For other errors, we just drop the connection.
  485. The disconnect function, which disconnects the client immediately, queues
  486. io_uring submissions to close the open file descriptors associated with it, and
  487. then removes it from the list of clients.
  488. Let's get back to the happy path. Remember the read SQE we submitted when the
  489. client established the connection? When the CQE comes in, the state machine
  490. directs us into this function:
  491. ```hare
  492. fn client_query(ctx: *context, client: *client, cqe: *io_uring::cqe) (void | error) = {
  493. const r = io_uring::result(cqe)?;
  494. if (r <= 0) {
  495. return unexpected_eof;
  496. };
  497. const r = r: size;
  498. if (len(client.buf) + r > len(client.xbuf) / 2) {
  499. return max_query;
  500. };
  501. client.buf = client.xbuf[..len(client.buf) + r];
  502. // The RFC requires queries to use CRLF, but it is also one of the few
  503. // RFCs which explicitly reminds you to, quote, "as with anything in the
  504. // IP protocol suite, 'be liberal in what you accept'", so we accept LF
  505. // as well.
  506. let lf = match (bytes::index(client.buf, '\n')) {
  507. z: size => z,
  508. void => {
  509. if (len(client.buf) == len(client.xbuf) / 2) {
  510. return max_query;
  511. };
  512. submit_read(ctx, client, client.fd, 0);
  513. return;
  514. },
  515. };
  516. if (lf > 0 && client.buf[lf - 1] == '\r': u8) {
  517. lf -= 1; // CRLF
  518. };
  519. const query = match (strings::try_fromutf8(client.buf[..lf])) {
  520. * => return invalid_query,
  521. q: str => q,
  522. };
  523. fmt::printfln("{}: finger {}", ip::string(client.addr), query)!;
  524. const plan = process_query(ctx, query)?;
  525. defer free(plan);
  526. client.plan_path = strings::to_c(plan);
  527. const sqe = io_uring::must_get_sqe(ctx.uring);
  528. io_uring::openat(sqe, rt::AT_FDCWD, client.plan_path, rt::O_RDONLY, 0);
  529. io_uring::set_user(sqe, client);
  530. client.state = state::OPEN_PLAN;
  531. };
  532. ```
  533. The first half of this function figures out if we've received a full line,
  534. including CRLF. The second half parses this line as a finger query and prepares
  535. to fulfill the enclosed request.
  536. The read operation behaves like the read(2) syscall, which returns 0 on EOF. We
  537. aren't expecting an EOF in this state, so if we see this, we boot them out. We
  538. also have a cap on our buffer length, so we return the max_query error if it's
  539. been exceeded. Otherwise, we look for a line feed. If there isn't one, we submit
  540. another read to get more from the client, but if a line feed is there, we trim
  541. off a carriage return (if present) and decode the completed query as a UTF-8
  542. string.
  543. We call "process_query" (using the error propagation operator to bubble up
  544. errors), which returns the path to the requested user's ~/.plan file. We'll look
  545. at the guts of that function in a moment. The return value is heap allocated, so
  546. we defer a free for later.
  547. Strings in our language are not null terminated, but io_uring expects them to
  548. be. This is another case which will be addressed transparently once we build a
  549. higher-level, portable interface. For now, though, we need to call
  550. "strings::to_c" ourselves, and stash it on the client struct. It's heap
  551. allocated, so we'll free it in the next state when the I/O submission completes.
  552. Speaking of which, we finish this process after preparing the next I/O operation
  553. &mdash; opening the plan file &mdash; and setting the client state to the next
  554. step in the state machine.
  555. Before we move on, though, I promised that we'd talk about the process_query
  556. function. Here it is in all of its crappy glory:
  557. ```hare
  558. use path;
  559. use strings;
  560. use unix::passwd;
  561. fn process_query(ctx: *context, q: str) (str | error) = {
  562. if (strings::has_prefix(q, "/W") || strings::has_prefix(q, "/w")) {
  563. q = strings::sub(q, 2, strings::end);
  564. for (strings::has_prefix(q, " ") || strings::has_prefix(q, "\t")) {
  565. q = strings::sub(q, 1, strings::end);
  566. };
  567. };
  568. if (strings::contains(q, '@')) {
  569. return relay_denied;
  570. };
  571. const user = q;
  572. const pwent = match (passwd::getuser(user)) {
  573. void => return no_such_user,
  574. p: passwd::pwent => p,
  575. };
  576. defer passwd::pwent_finish(pwent);
  577. let enabled = false;
  578. for (let i = 0z; i < len(ctx.users); i += 1) {
  579. if (user == ctx.users[i]) {
  580. enabled = true;
  581. break;
  582. };
  583. };
  584. if (!enabled) {
  585. return no_such_user;
  586. };
  587. return path::join(pwent.homedir, ".plan");
  588. };
  589. ```
  590. The [grammar described in RFC 1288][grammar] is pretty confusing, but most of it
  591. is to support features I'm not interested in for this simple implementation,
  592. like relaying to other finger hosts or requesting additional information. I
  593. think I've "parsed" most of the useful bits here, and ultimately I'm aiming to
  594. end up with a single string: the username whose details we want. I grab the
  595. user's passwd entry and check if they're a member of the "finger" group we
  596. populated way up there in the first code sample. If so, we pull the path to
  597. their homedir out of the passwd entry, join it with ".plan", and send it up the
  598. chain.
  599. [grammar]: https://datatracker.ietf.org/doc/html/rfc1288#section-2.3
  600. At this point we've received, validated, and parsed the client's query, and
  601. looked up the plan file we need. The next step is to open the plan file, which
  602. is where we left off at the end of the last function. The I/O we prepared there
  603. takes us here when it completes:
  604. ```hare
  605. fn client_open_plan(
  606. ctx: *context,
  607. client: *client,
  608. cqe: *io_uring::cqe,
  609. ) (void | error) = {
  610. free(client.plan_path);
  611. client.plan_fd = io_uring::result(cqe)?;
  612. client.buf = client.xbuf[..0];
  613. client.state = state::READ_PLAN;
  614. submit_read(ctx, client, client.plan_fd, -1);
  615. };
  616. ```
  617. By now, this should be pretty comprehensible. I will clarify what the "[..0]"
  618. syntax does here, though. This language has slices, which store a pointer to an
  619. array, a length, and a capacity. In our client state, xbuf is a fixed-length
  620. array which provides the actual storage, and "buf" is a slice of that array,
  621. which acts as a kind of cursor, telling us what portion of the buffer is valid.
  622. The result of this expression is to take a slice up to, but not including, the
  623. 0th item of that array &mdash; in other words, an empty slice. The address and
  624. capacity of the slice still reflect the traits of the underlying array, however,
  625. which is what we want.
  626. We're now ready to read data out of the user's plan file. We submit a read
  627. operation for that file descriptor, and when it completes, we'll end up here:
  628. ```hare
  629. fn client_read_plan(
  630. ctx: *context,
  631. client: *client,
  632. cqe: *io_uring::cqe,
  633. ) (void | error) = {
  634. const r = io_uring::result(cqe)?;
  635. if (r == 0) {
  636. disconnect(ctx, client);
  637. return;
  638. };
  639. client.buf = client.xbuf[..r];
  640. // Convert LF to CRLF
  641. //
  642. // We always read a maximum of the length of xbuf over two so that we
  643. // have room to insert these.
  644. let seencrlf = false;
  645. for (let i = 0z; i < len(client.buf); i += 1) {
  646. switch (client.buf[i]) {
  647. '\r' => seencrlf = true,
  648. '\n' => if (!seencrlf) {
  649. static insert(client.buf[i], '\r');
  650. i += 1;
  651. },
  652. * => seencrlf = false,
  653. };
  654. };
  655. client.state = state::WRITE_RESP;
  656. submit_write(ctx, client, client.fd);
  657. };
  658. ```
  659. Again, the read operation for io_uring behaves similarly to the read(2) syscall,
  660. so it returns the number of bytes read. If this is zero, or EOF, we can
  661. terminate the state machine and disconnect the client (this is a nominal
  662. disconnect, so we don't use disconnect_err here). If it's nonzero, we set our
  663. buffer slice to the subset of the buffer which represents the data io_uring has
  664. read.
  665. The Finger RFC requires all data to use CRLF for line endings, and this is where
  666. we deal with it. Remember earlier when I noted that we only ever used half of
  667. the read buffer? This is why: if we read 1024 newlines from the plan file, we
  668. will need another 1024 bytes to insert carriage returns. Because we've planned
  669. for and measured out our memory requirements in advance, we can use "static
  670. insert" here. This built-in works similarly to how insert normally works, but it
  671. will never re-allocate the underlying array. Instead, it asserts that the
  672. insertion would not require a re-allocation, and if it turns out that you did
  673. the math wrong, it aborts the program instead of buffer overflowing. But, we did
  674. the math and it works out, so it saves us from an extra allocation.
  675. Capping this off, we submit a write to transmit this buffer to the client.
  676. "submit_write" is quite similar to submit_read:
  677. ```hare
  678. fn submit_write(ctx: *context, client: *client, fd: int) void = {
  679. const sqe = io_uring::must_get_sqe(ctx.uring);
  680. io_uring::write(sqe, fd, client.buf: *[*]u8, len(client.buf),
  681. 0, flags::IO_LINK);
  682. io_uring::set_user(sqe, client);
  683. let ts = rt::timespec { ... };
  684. time::instant_to_timespec(client.deadline, &ts);
  685. const sqe = io_uring::must_get_sqe(ctx.uring);
  686. io_uring::link_timeout(sqe, &ts, timeout_flags::ABS);
  687. };
  688. ```
  689. Ideally, this should not require explanation. From here we transition to the
  690. WRITE_RESP state, so when the I/O completes we end up here:
  691. ```hare
  692. fn client_write_resp(
  693. ctx: *context,
  694. client: *client,
  695. cqe: *io_uring::cqe,
  696. ) (void | error) = {
  697. const r = io_uring::result(cqe)?: size;
  698. if (r < len(client.buf)) {
  699. client.buf = client.buf[r..];
  700. submit_write(ctx, client, client.fd);
  701. return;
  702. };
  703. if (client.state == state::WRITE_ERROR) {
  704. disconnect(ctx, client);
  705. return;
  706. };
  707. client.buf = client.xbuf[..0];
  708. client.state = state::READ_PLAN;
  709. submit_read(ctx, client, client.plan_fd, -1);
  710. };
  711. ```
  712. First, we check if we need to repeat this process: if we have written less than
  713. the size of the buffer, then we advance the slice by that much and submit
  714. another write.
  715. We can arrive at the next bit for two reasons: because "client.buf" includes a
  716. fragment of a plan file which has been transmitted to the client, which we just
  717. covered, or because it is the error message buffer prepared by "disconnect_err",
  718. which we discussed earlier. The dispatch function will bring us here for both
  719. the normal and error states, and we distinguish between them with this second if
  720. statement. If we're sending the plan file, we submit a read for the next
  721. buffer-ful of plan. But, our error messages always fit into one buffer, so if we
  722. ran out of buffer then we can just disconnect in the error case.
  723. And that's it! That completes our state machine, and I'm pretty sure we've read
  724. the entire program's source code by this point. Pretty neat, huh? io_uring is
  725. quite interesting. I plan on using this as a little platform upon which I can
  726. further test our io_uring implementation and develop a portable async I/O
  727. abstraction. We haven't implemented a DNS resolver for the stdlib yet, but I'll
  728. also be writing a finger client (using synchronous I/O this time) once we do.
  729. If you really wanted to max out the performance for a CLOUD SCALE WEB 8.0 XTREME
  730. PERFORMANCE finger server, we could try a few additional improvements:
  731. - Adding an internal queue for clients until we have room for their I/O in the SQ
  732. - Using a shared buffer pool with the kernel, with io_uring ops like READ_FIXED
  733. - Batching requests for the same plan file by only answering requests for it
  734. every Nth millisecond (known to some as the "data loader" pattern)
  735. - More slow loris mitigations, such as limiting open connections per IP address
  736. It would also be cool to handle SIGHUP to reload our finger group membership
  737. list without rebooting the daemon. I would say "patches welcome", but I won't
  738. share the git repo until the language is ready. And the code is GPL'd, but not
  739. AGPL'd, so you aren't entitled to it if you finger me!