logo

drewdevault.com

[mirror] blog and personal website of Drew DeVault git clone https://hacktivis.me/git/mirror/drewdevault.com.git

io_uring-finger-server.md (32473B)


  1. ---
  2. title: Using io_uring to make a high-performance... finger server
  3. date: 2021-05-24
  4. ---
  5. I'm working on adding a wrapper for the [Linux io_uring interface][0] to my
  6. [secret programming language project][1]. To help learn more about io_uring and
  7. to test out the interface I was designing, I needed a small project whose design
  8. was well-suited for the value-add of io_uring. The [Finger protocol][2] is
  9. perfect for this! After being designed in the 70's and then completely forgotten
  10. about for 50 years, it's the perfect small and simple network protocol to test
  11. drive this new interface with.
  12. [0]: https://unixism.net/loti/what_is_io_uring.html
  13. [1]: https://drewdevault.com/2021/03/19/A-new-systems-language.html
  14. [2]: https://en.wikipedia.org/wiki/Finger_protocol
  15. In short, finger will reach out to a remote server and ask for information about
  16. a user. It was used back in the day to find contact details like the user's
  17. phone number, office address, email address, sometimes their favorite piece of
  18. ASCII art, and, later, a summary of the things they were working on at the
  19. moment. The somewhat provocative name allegedly comes from an older usage of
  20. the word to mean "a snitch" or a member of the FBI. The last useful RFC related
  21. to Finger is [RFC 1288][3], circa 1999, which will be our reference for this
  22. server. If you want to give it a test drive, try this to ping the server we'll
  23. be discussing today:
  24. ```
  25. printf 'drew\r\n' | nc drewdevault.com 79
  26. ```
  27. You might also have the finger command installed locally (try running "finger
  28. drew\@drewdevault.com"), and you can try out the [Castor][4] browser by sourcehut
  29. user ~julienxx for a graphical experience.
  30. [3]: https://datatracker.ietf.org/doc/html/rfc1288
  31. [4]: https://sr.ht/~julienxx/Castor/
  32. And what is io_uring? It is the latest interface for async I/O on Linux, and
  33. it's pretty innovative and interesting. The basic idea is to set up some memory
  34. which is shared between the kernel and the userspace program, and stash a couple
  35. of ring buffers there that can be updated with atomic writes. Userspace appends
  36. submission queue entries (SQEs) to the submission queue (SQ), and the kernel
  37. processes the I/O requests they describe and then appends completion queue
  38. events (CQEs) to the completion queue (CQ). Interestingly, both sides can see
  39. this happening *without* entering the kernel with a syscall, which is a major
  40. performance boost. It more or less solves the async I/O problem for Linux, which
  41. Linux (and Unix at large) has struggled to do for a long time.
  42. With that the background in place, I'm going to walk you through my finger
  43. server's code. Given that this is written in an as-of-yet unreleased programming
  44. language, I'll do my best to help you decipher the alien code.
  45. <details>
  46. <summary>A quick disclaimer</summary>
  47. <p>
  48. This language, the standard library, and the interface provided by
  49. linux::io_uring, are all works in progress and are subject to change. In
  50. particular, this program will become obsolete when we design a portable I/O
  51. bus interface, which on Linux will be backed by io_uring but on other systems
  52. will use kqueue, poll, etc.
  53. <p>
  54. As a rule of thumb, anything which uses rt:: or linux:: is likely to change or
  55. be moved behind a portable abstraction in the future.
  56. </details>
  57. Let's start with the basics:
  58. <!--
  59. Yes, it's called Hare. Now keep that to yourself.
  60. -->
  61. ```hare
  62. use fmt;
  63. use getopt;
  64. use net::ip;
  65. use strconv;
  66. use unix::passwd;
  67. def MAX_CLIENTS: uint = 128;
  68. export fn main() void = {
  69. let addr: ip::addr = ip::ANY_V6;
  70. let port = 79u16;
  71. let group = "finger";
  72. const cmd = getopt::parse(os::args,
  73. "finger server",
  74. ('B', "addr", "address to bind to (default: all)"),
  75. ('P', "port", "port to bind to (default: 79)"),
  76. ('g', "group", "user group enabled for finger access (default: finger)"));
  77. defer getopt::finish(&cmd);
  78. for (let i = 0z; i < len(cmd.opts); i += 1) {
  79. const opt = cmd.opts[i];
  80. switch (opt.0) {
  81. 'B' => match (ip::parse(opt.1)) {
  82. a: ip::addr => addr = a,
  83. ip::invalid => fmt::fatal("Invalid IP address"),
  84. },
  85. 'P' => match (strconv::stou16(opt.1)) {
  86. u: u16 => port = u,
  87. strconv::invalid => fmt::fatal("Invalid port"),
  88. strconv::overflow => fmt::fatal("Port exceeds range"),
  89. },
  90. 'g' => group = opt.1,
  91. };
  92. };
  93. const grent = match (passwd::getgroup(group)) {
  94. void => fmt::fatal("No '{}' group available", group),
  95. gr: passwd::grent => gr,
  96. };
  97. defer passwd::grent_finish(grent);
  98. };
  99. ```
  100. None of this code is related to io_uring or finger, but just handling some
  101. initialization work. This is the daemon program, and it will accept some basic
  102. configuration via the command line. The getopt configuration shown here will
  103. produce the following help string:
  104. ```
  105. $ fingerd -h
  106. fingerd: finger server
  107. Usage: ./fingerd [-B <addr>] [-P <port>] [-g <group>]
  108. -B <addr>: address to bind to (default: all)
  109. -P <port>: port to bind to (default: 79)
  110. -g <group>: user group enabled for finger access (default: finger)
  111. ```
  112. The basic idea is to make finger access opt-in for a given Unix account by
  113. adding them to the "finger" group. The "passwd::getgroup" lookup fetches that
  114. entry from /etc/group to identify the list of users for whom we should be
  115. serving finger access.
  116. ```hare
  117. let serv = match (net::listen(addr, port,
  118. 256: net::backlog, net::reuseport)) {
  119. err: io::error => fmt::fatal("listen: {}", io::strerror(err)),
  120. l: *net::listener => l,
  121. };
  122. defer net::shutdown(serv);
  123. fmt::printfln("Server running on :{}", port)!;
  124. ```
  125. Following this, we set up a TCP listener. I went for a backlog of 256
  126. connections (overkill for a finger server, but hey), and set reuseport so you
  127. can achieve CLOUD SCALE by running several daemons at once.
  128. Next, I set up the io_uring that we'll be using:
  129. ```hare
  130. // The ring size is 2 for the accept and sigfd read, plus 2 SQEs for
  131. // each of up to MAX_CLIENTS: either read/write plus a timeout, or up to
  132. // two close SQEs during cleanup.
  133. static assert(MAX_CLIENTS * 2 + 2 <= io_uring::MAX_ENTRIES);
  134. let params = io_uring::params { ... };
  135. let ring = match (io_uring::setup(MAX_CLIENTS * 2 + 2, &params)) {
  136. ring: io_uring::io_uring => ring,
  137. err: io_uring::error => fmt::fatal(io_uring::strerror(err)),
  138. };
  139. defer io_uring::finish(&ring);
  140. ```
  141. If we were running this as root (and we often are, given that fingerd binds to
  142. port 79 by default), we could go syscall-free by adding
  143. `io_uring::setup_flags::SQPOLL` to `params.flags`, but this requires more
  144. testing on my part so I have not added it yet. With this configuration, we'll
  145. need to use the `io_uring_enter` syscall to submit I/O requests.
  146. We also have to pick a queue size when setting up the uring. I planned this out
  147. so that we can have two SQEs in flight for every client at once &mdash; one for
  148. a read/write request and its corresponding timeout, or for the two "close"
  149. requests used when disconnecting the client &mdash; plus two extra entries, one
  150. for the "accept" call, and another to wait for signals from a signalfd.
  151. Speaking of signalfds:
  152. ```hare
  153. let mask = rt::sigset { ... };
  154. rt::sigaddset(&mask, rt::SIGINT)!;
  155. rt::sigaddset(&mask, rt::SIGTERM)!;
  156. rt::sigprocmask(rt::SIG_BLOCK, &mask, null)!;
  157. let sigfd = signalfd::signalfd(-1, &mask, 0)!;
  158. defer rt::close(sigfd)!;
  159. const files = [net::listenerfd(serv) as int, sigfd];
  160. io_uring::register_files(&ring, files)!;
  161. const sqe = io_uring::must_get_sqe(&ring);
  162. io_uring::poll_add(sqe, 1, rt::POLLIN: uint, flags::FIXED_FILE);
  163. io_uring::set_user(sqe, &sigfd);
  164. ```
  165. We haven't implemented a high-level signal interface yet, so this is just using
  166. the syscall wrappers. I chose to use a signalfd here so I can monitor for SIGINT
  167. and SIGTERM with my primary I/O event loop, to (semi-)gracefully[^1] terminate
  168. the server.
  169. [^1]: Right now the implementation drops all in-flight requests during shutdown. If we wanted to be even more graceful, it would be pretty easy to stop accepting new connections and do a soft shutdown while we finish servicing any active clients. net::reuseport would allow us to provide zero downtime during reboots with this approach, since another daemon could continue servicing users while this one is shutting down.
  170. This also happens to show off our first SQE submission. "must_get_sqe" will
  171. fetch the next SQE, asserting that there is one available, which relies on the
  172. math I explained earlier when planning for our queue size. Then, we populate
  173. this SQE with a "poll_add" operation, which polls on the first fixed file
  174. descriptor. The "register" call above adds the socket and signal file
  175. descriptors to the io_uring's list of "fixed" file descriptors, and so with
  176. "flags::FIXED_FILE" this refers to the signalfd.
  177. We also set the user_data field of the SQE with "set_user". This will be copied
  178. to the CQE later, and it's necessary that we provide a unique value in order to
  179. correlate the CQE back to the SQE it refers to. We can use any value, and the
  180. address of the signalfd variable is a convenient number we can use for this
  181. purpose.
  182. There's one more step &mdash; submitting the SQE &mdash; but that'll wait until
  183. we set up more I/O. Next, I have set up a "context" structure which will store
  184. all of the state the server needs to work with, to be passed to functions
  185. throughout the program.
  186. ```hare
  187. type context = struct {
  188. users: []str,
  189. clients: []*client,
  190. uring: *io_uring::io_uring,
  191. };
  192. // ...
  193. const ctx = context {
  194. users = grent.userlist,
  195. uring = &ring,
  196. ...
  197. };
  198. ```
  199. The second "\..." towards the end is not for illustrative purposes - it sets all
  200. of the remaining fields to their default values (in this case, clients becomes
  201. an empty slice).
  202. Finally, this brings us to the main loop:
  203. ```hare
  204. let accept_waiting = false;
  205. for (true) {
  206. const peeraddr = rt::sockaddr { ... };
  207. const peeraddr_sz = size(rt::sockaddr): uint;
  208. if (!accept_waiting && len(ctx.clients) < MAX_CLIENTS) {
  209. const sqe = io_uring::must_get_sqe(ctx.uring);
  210. io_uring::accept(sqe, 0, &peeraddr, &peeraddr_sz,
  211. 0, flags::FIXED_FILE);
  212. io_uring::set_user(sqe, &peeraddr);
  213. accept_waiting = true;
  214. };
  215. io_uring::submit(&ring)!;
  216. let cqe = match (io_uring::wait(ctx.uring)) {
  217. err: io_uring::error => fmt::fatal("Error: {}",
  218. io_uring::strerror(err)),
  219. cqe: *io_uring::cqe => cqe,
  220. };
  221. defer io_uring::cqe_seen(&ring, cqe);
  222. const user = io_uring::get_user(cqe);
  223. if (user == &peeraddr) {
  224. accept(&ctx, cqe, &peeraddr);
  225. accept_waiting = false;
  226. } else if (user == &sigfd) {
  227. let si = signalfd::siginfo { ... };
  228. rt::read(sigfd, &si, size(signalfd::siginfo))!;
  229. fmt::errorln("Caught signal, terminating")!;
  230. break;
  231. } else for (let i = 0z; i < len(ctx.clients); i += 1) {
  232. let client = ctx.clients[i];
  233. if (user == client) {
  234. dispatch(&ctx, client, cqe);
  235. break;
  236. };
  237. };
  238. };
  239. ```
  240. At each iteration, assuming we have room and aren't already waiting on a new
  241. connection, we submit an "accept" SQE to fetch the next incoming client. This
  242. SQE accepts an additional parameter to write the client's IP address to, which
  243. we provide via a pointer to our local peeraddr variable.
  244. We call "submit" at the heart of the loop to submit any SQEs we have pending
  245. (including both the signalfd poll and the accept call, but also anything our
  246. future client handling code will submit) to the io_uring, then wait the next CQE
  247. from the kernel.
  248. When we get one, we defer a "cqe_seen", which will execute at the end of the
  249. current scope (i.e. the end of this loop iteration) to advance our end of the
  250. completion queue, then figure out what I/O request was completed. The code
  251. earlier sets up SQEs for the accept and signalfd, which we check here. If a
  252. signal comes in, we read the details to acknowledge it and then terminate the
  253. loop. We also check if the user data was set to the address of any client state
  254. data, which we'll use to dispatch for client-specific I/O later on. If a new
  255. connection comes in:
  256. ```hare
  257. fn accept(ctx: *context, cqe: *io_uring::cqe, peeraddr: *rt::sockaddr) void = {
  258. const fd = match (io_uring::result(cqe)) {
  259. err: io_uring::error => fmt::fatal("Error: accept: {}",
  260. io_uring::strerror(err)),
  261. fd: int => fd,
  262. };
  263. const peer = net::ip::from_native(*peeraddr);
  264. const now = time::now(time::clock::MONOTONIC);
  265. const client = alloc(client {
  266. state = state::READ_QUERY,
  267. deadline = time::add(now, 10 * time::SECOND),
  268. addr = peer.0,
  269. fd = fd,
  270. plan_fd = -1,
  271. ...
  272. });
  273. append(ctx.clients, client);
  274. submit_read(ctx, client, client.fd, 0);
  275. };
  276. ```
  277. This is fairly self-explanatory, but we do see the first example of how to
  278. determine the result from a CQE. The result field of the CQE structure the
  279. kernel fills in is set to what would normally be the return value of the
  280. equivalent syscall, and "linux::io_uring::result" is a convenience function
  281. which translates negative values (i.e. errno) into a more idiomatic result type.
  282. We choose a deadline here, 10 seconds from when the connection is established,
  283. for the entire exchange to be completed by. This helps to mitigate
  284. [Slowloris][loris] attacks, though there are more mitigations we could implement
  285. for this.
  286. [loris]: https://en.wikipedia.org/wiki/Slowloris_(computer_security)
  287. Our client state is handled by a state machine, which starts in the
  288. "READ_QUERY" state. Per the RFC, the client will be sending us a query, followed
  289. by a CRLF. Our initial state is prepared to handle this. The full client state
  290. structure is as follows:
  291. ```hare
  292. type state = enum {
  293. READ_QUERY,
  294. OPEN_PLAN,
  295. READ_PLAN,
  296. WRITE_RESP,
  297. WRITE_ERROR,
  298. };
  299. type client = struct {
  300. state: state,
  301. deadline: time::instant,
  302. addr: ip::addr,
  303. fd: int,
  304. plan_fd: int,
  305. plan_path: *const char,
  306. xbuf: [2048]u8,
  307. buf: []u8,
  308. };
  309. ```
  310. Each field will be explained in due time. We add this to our list of active
  311. connections and call "submit_read".
  312. ```hare
  313. fn submit_read(ctx: *context, client: *client, fd: int, offs: size) void = {
  314. const sqe = io_uring::must_get_sqe(ctx.uring);
  315. const maxread = len(client.xbuf) / 2;
  316. io_uring::read(sqe, fd, client.xbuf[len(client.buf)..]: *[*]u8,
  317. maxread - len(client.buf), offs: u64, flags::IO_LINK);
  318. io_uring::set_user(sqe, client);
  319. let ts = rt::timespec { ... };
  320. time::instant_to_timespec(client.deadline, &ts);
  321. const sqe = io_uring::must_get_sqe(ctx.uring);
  322. io_uring::link_timeout(sqe, &ts, timeout_flags::ABS);
  323. };
  324. ```
  325. I've prepared two SQEs here. The first is a read, which will fill half of the
  326. client buffer with whatever they send us over the network (why half? I'll
  327. explain later). It's configured with "flags::IO_LINK", which will link it to the
  328. following request: a timeout. This will cause the I/O to be cancelled if it
  329. doesn't complete before the deadline we set earlier. "timeout_flags::ABS"
  330. specifies that the timeout is an absolute timestamp rather than a duration
  331. computed from the time of I/O submission.
  332. I set the user data to the client state pointer, which will be used the next
  333. time we have a go-around in the main event loop (feel free to scroll back up if
  334. you want to re-read that bit). The event loop will send the CQE to the dispatch
  335. function, which will choose the appropriate action based on the current client
  336. state.
  337. ```hare
  338. fn dispatch(ctx: *context, client: *client, cqe: *io_uring::cqe) void = {
  339. match (switch (client.state) {
  340. state::READ_QUERY => client_query(ctx, client, cqe),
  341. state::OPEN_PLAN => client_open_plan(ctx, client, cqe),
  342. state::READ_PLAN => client_read_plan(ctx, client, cqe),
  343. state::WRITE_RESP, state::WRITE_ERROR =>
  344. client_write_resp(ctx, client, cqe),
  345. }) {
  346. err: error => disconnect_err(ctx, client, err),
  347. void => void,
  348. };
  349. };
  350. ```
  351. *What's the difference between match and switch? The former works with types,
  352. and switch works with values. We might attempt to merge these before the
  353. language's release, but for now the distinction simplifies our design.*
  354. I've structured the client state machine into four states based on the kind of
  355. I/O they handle, plus a special case for error handling:
  356. 1. Reading the query from the client
  357. 2. Opening the plan file for the requested user
  358. 3. Reading from the plan file
  359. 4. Forwarding its contents to the client
  360. ![](https://redacted.moe/f/2eb5650a.svg)
  361. Each circle in this diagram represents a point where we will submit some I/O to
  362. our io_uring instance and return to the event loop. If any I/O resulted in an
  363. error, we'll follow the dotted line to the error path, which transmits the error
  364. to the user (and if an error occurs *during* error transmission, we'll
  365. immediately disconnect them, but that's not shown here).
  366. I need to give a simplified introduction to error handling in this new
  367. programming language before we move on, so let's take a brief detour. In this
  368. language, we require the user to explicitly do *something* about errors.
  369. Generally speaking, there are three somethings that you will do:
  370. - Some context-appropriate response to an error condition
  371. - Bumping the error up to the caller to deal with
  372. - Asserting that the error will never happen in practice
  373. The latter two options have special operators ("?" and "!", respectively, used
  374. as postfix operators on expressions which can fail), and the first option is
  375. handled manually in each situation as appropriate. It's usually most convenient
  376. to use ? to pass errors up the stack, but the buck has got to stop somewhere. In
  377. the code we've seen so far, we're in or near the main function &mdash; the top
  378. of the call stack &mdash; and so have to handle these errors manually, usually
  379. by terminating the program with "!". But, when a client causes an error, we
  380. cannot terminate the program without creating a DoS vulnerability. This
  381. "dispatch" function sets up common client error handling accordingly, allowing
  382. later functions to use the "?" operator to pass errors up to it.
  383. To represent the errors themselves, we use a lightweight approach to tagged
  384. unions, similar to a result type. Each error type, optionally with some extra
  385. metadata, is enumerated, along with any possible successful types, as part of a
  386. function's return type. The only difference between an error type and a normal
  387. type is that the former is denoted with a "!" modifier &mdash; so you can store
  388. any representable state in an error type.
  389. I also wrote an "errors" file which provides uniform error handling for all of
  390. the various error conditions we can expect to occur in this program. This
  391. includes all of the error conditions that we define ourselves, as well as any
  392. errors we expect to encounter from modules we depend on. The result looks like
  393. this:
  394. ```hare
  395. use fs;
  396. use io;
  397. use linux::io_uring;
  398. type unexpected_eof = !void;
  399. type invalid_query = !void;
  400. type no_such_user = !void;
  401. type relay_denied = !void;
  402. type max_query = !void;
  403. type error = !(
  404. io::error |
  405. fs::error |
  406. io_uring::error |
  407. unexpected_eof |
  408. invalid_query |
  409. no_such_user |
  410. relay_denied |
  411. max_query
  412. );
  413. fn strerror(err: error) const str = {
  414. match (err) {
  415. err: io::error => io::strerror(err),
  416. err: fs::error => fs::strerror(err),
  417. err: io_uring::error => io_uring::strerror(err),
  418. unexpected_eof => "Unexpected EOF",
  419. invalid_query => "Invalid query",
  420. no_such_user => "No such user",
  421. relay_denied => "Relay access denied",
  422. max_query => "Maximum query length exceeded",
  423. };
  424. };
  425. ```
  426. With an understanding of error handling, we can re-read the dispatch function's
  427. common error handling for all client issues:
  428. ```hare
  429. fn dispatch(ctx: *context, client: *client, cqe: *io_uring::cqe) void = {
  430. match (switch (client.state) {
  431. state::READ_QUERY => client_query(ctx, client, cqe),
  432. state::OPEN_PLAN => client_open_plan(ctx, client, cqe),
  433. state::READ_PLAN => client_read_plan(ctx, client, cqe),
  434. state::WRITE_RESP, state::WRITE_ERROR =>
  435. client_write_resp(ctx, client, cqe),
  436. }) {
  437. err: error => disconnect_err(ctx, client, err),
  438. void => void,
  439. };
  440. };
  441. ```
  442. Each dispatched-to function returns a tagged union of (void | error), the latter
  443. being our common error type. If they return void, we do nothing, but if an error
  444. occurred, we call "disconnect_err".
  445. ```hare
  446. fn disconnect_err(ctx: *context, client: *client, err: error) void = {
  447. fmt::errorfln("{}: Disconnecting with error: {}",
  448. ip::string(client.addr), strerror(err))!;
  449. const forward = match (err) {
  450. (unexpected_eof | invalid_query | no_such_user
  451. | relay_denied | max_query) => true,
  452. * => false,
  453. };
  454. if (!forward) {
  455. disconnect(ctx, client);
  456. return;
  457. };
  458. client.buf = client.xbuf[..];
  459. const s = fmt::bsprintf(client.buf, "Error: {}\r\n", strerror(err));
  460. client.buf = client.buf[..len(s)];
  461. client.state = state::WRITE_ERROR;
  462. submit_write(ctx, client, client.fd);
  463. };
  464. fn disconnect(ctx: *context, client: *client) void = {
  465. const sqe = io_uring::must_get_sqe(ctx.uring);
  466. io_uring::close(sqe, client.fd);
  467. if (client.plan_fd != -1) {
  468. const sqe = io_uring::must_get_sqe(ctx.uring);
  469. io_uring::close(sqe, client.plan_fd);
  470. };
  471. let i = 0z;
  472. for (i < len(ctx.clients); i += 1) {
  473. if (ctx.clients[i] == client) {
  474. break;
  475. };
  476. };
  477. delete(ctx.clients[i]);
  478. free(client);
  479. };
  480. ```
  481. We log the error here, and for certain kinds of errors, we "forward" them to the
  482. client by writing them to our client buffer and going into the "WRITE_RESP"
  483. state. For other errors, we just drop the connection.
  484. The disconnect function, which disconnects the client immediately, queues
  485. io_uring submissions to close the open file descriptors associated with it, and
  486. then removes it from the list of clients.
  487. Let's get back to the happy path. Remember the read SQE we submitted when the
  488. client established the connection? When the CQE comes in, the state machine
  489. directs us into this function:
  490. ```hare
  491. fn client_query(ctx: *context, client: *client, cqe: *io_uring::cqe) (void | error) = {
  492. const r = io_uring::result(cqe)?;
  493. if (r <= 0) {
  494. return unexpected_eof;
  495. };
  496. const r = r: size;
  497. if (len(client.buf) + r > len(client.xbuf) / 2) {
  498. return max_query;
  499. };
  500. client.buf = client.xbuf[..len(client.buf) + r];
  501. // The RFC requires queries to use CRLF, but it is also one of the few
  502. // RFCs which explicitly reminds you to, quote, "as with anything in the
  503. // IP protocol suite, 'be liberal in what you accept'", so we accept LF
  504. // as well.
  505. let lf = match (bytes::index(client.buf, '\n')) {
  506. z: size => z,
  507. void => {
  508. if (len(client.buf) == len(client.xbuf) / 2) {
  509. return max_query;
  510. };
  511. submit_read(ctx, client, client.fd, 0);
  512. return;
  513. },
  514. };
  515. if (lf > 0 && client.buf[lf - 1] == '\r': u8) {
  516. lf -= 1; // CRLF
  517. };
  518. const query = match (strings::try_fromutf8(client.buf[..lf])) {
  519. * => return invalid_query,
  520. q: str => q,
  521. };
  522. fmt::printfln("{}: finger {}", ip::string(client.addr), query)!;
  523. const plan = process_query(ctx, query)?;
  524. defer free(plan);
  525. client.plan_path = strings::to_c(plan);
  526. const sqe = io_uring::must_get_sqe(ctx.uring);
  527. io_uring::openat(sqe, rt::AT_FDCWD, client.plan_path, rt::O_RDONLY, 0);
  528. io_uring::set_user(sqe, client);
  529. client.state = state::OPEN_PLAN;
  530. };
  531. ```
  532. The first half of this function figures out if we've received a full line,
  533. including CRLF. The second half parses this line as a finger query and prepares
  534. to fulfill the enclosed request.
  535. The read operation behaves like the read(2) syscall, which returns 0 on EOF. We
  536. aren't expecting an EOF in this state, so if we see this, we boot them out. We
  537. also have a cap on our buffer length, so we return the max_query error if it's
  538. been exceeded. Otherwise, we look for a line feed. If there isn't one, we submit
  539. another read to get more from the client, but if a line feed is there, we trim
  540. off a carriage return (if present) and decode the completed query as a UTF-8
  541. string.
  542. We call "process_query" (using the error propagation operator to bubble up
  543. errors), which returns the path to the requested user's ~/.plan file. We'll look
  544. at the guts of that function in a moment. The return value is heap allocated, so
  545. we defer a free for later.
  546. Strings in our language are not null terminated, but io_uring expects them to
  547. be. This is another case which will be addressed transparently once we build a
  548. higher-level, portable interface. For now, though, we need to call
  549. "strings::to_c" ourselves, and stash it on the client struct. It's heap
  550. allocated, so we'll free it in the next state when the I/O submission completes.
  551. Speaking of which, we finish this process after preparing the next I/O operation
  552. &mdash; opening the plan file &mdash; and setting the client state to the next
  553. step in the state machine.
  554. Before we move on, though, I promised that we'd talk about the process_query
  555. function. Here it is in all of its crappy glory:
  556. ```hare
  557. use path;
  558. use strings;
  559. use unix::passwd;
  560. fn process_query(ctx: *context, q: str) (str | error) = {
  561. if (strings::has_prefix(q, "/W") || strings::has_prefix(q, "/w")) {
  562. q = strings::sub(q, 2, strings::end);
  563. for (strings::has_prefix(q, " ") || strings::has_prefix(q, "\t")) {
  564. q = strings::sub(q, 1, strings::end);
  565. };
  566. };
  567. if (strings::contains(q, '@')) {
  568. return relay_denied;
  569. };
  570. const user = q;
  571. const pwent = match (passwd::getuser(user)) {
  572. void => return no_such_user,
  573. p: passwd::pwent => p,
  574. };
  575. defer passwd::pwent_finish(pwent);
  576. let enabled = false;
  577. for (let i = 0z; i < len(ctx.users); i += 1) {
  578. if (user == ctx.users[i]) {
  579. enabled = true;
  580. break;
  581. };
  582. };
  583. if (!enabled) {
  584. return no_such_user;
  585. };
  586. return path::join(pwent.homedir, ".plan");
  587. };
  588. ```
  589. The [grammar described in RFC 1288][grammar] is pretty confusing, but most of it
  590. is to support features I'm not interested in for this simple implementation,
  591. like relaying to other finger hosts or requesting additional information. I
  592. think I've "parsed" most of the useful bits here, and ultimately I'm aiming to
  593. end up with a single string: the username whose details we want. I grab the
  594. user's passwd entry and check if they're a member of the "finger" group we
  595. populated way up there in the first code sample. If so, we pull the path to
  596. their homedir out of the passwd entry, join it with ".plan", and send it up the
  597. chain.
  598. [grammar]: https://datatracker.ietf.org/doc/html/rfc1288#section-2.3
  599. At this point we've received, validated, and parsed the client's query, and
  600. looked up the plan file we need. The next step is to open the plan file, which
  601. is where we left off at the end of the last function. The I/O we prepared there
  602. takes us here when it completes:
  603. ```hare
  604. fn client_open_plan(
  605. ctx: *context,
  606. client: *client,
  607. cqe: *io_uring::cqe,
  608. ) (void | error) = {
  609. free(client.plan_path);
  610. client.plan_fd = io_uring::result(cqe)?;
  611. client.buf = client.xbuf[..0];
  612. client.state = state::READ_PLAN;
  613. submit_read(ctx, client, client.plan_fd, -1);
  614. };
  615. ```
  616. By now, this should be pretty comprehensible. I will clarify what the "[..0]"
  617. syntax does here, though. This language has slices, which store a pointer to an
  618. array, a length, and a capacity. In our client state, xbuf is a fixed-length
  619. array which provides the actual storage, and "buf" is a slice of that array,
  620. which acts as a kind of cursor, telling us what portion of the buffer is valid.
  621. The result of this expression is to take a slice up to, but not including, the
  622. 0th item of that array &mdash; in other words, an empty slice. The address and
  623. capacity of the slice still reflect the traits of the underlying array, however,
  624. which is what we want.
  625. We're now ready to read data out of the user's plan file. We submit a read
  626. operation for that file descriptor, and when it completes, we'll end up here:
  627. ```hare
  628. fn client_read_plan(
  629. ctx: *context,
  630. client: *client,
  631. cqe: *io_uring::cqe,
  632. ) (void | error) = {
  633. const r = io_uring::result(cqe)?;
  634. if (r == 0) {
  635. disconnect(ctx, client);
  636. return;
  637. };
  638. client.buf = client.xbuf[..r];
  639. // Convert LF to CRLF
  640. //
  641. // We always read a maximum of the length of xbuf over two so that we
  642. // have room to insert these.
  643. let seencrlf = false;
  644. for (let i = 0z; i < len(client.buf); i += 1) {
  645. switch (client.buf[i]) {
  646. '\r' => seencrlf = true,
  647. '\n' => if (!seencrlf) {
  648. static insert(client.buf[i], '\r');
  649. i += 1;
  650. },
  651. * => seencrlf = false,
  652. };
  653. };
  654. client.state = state::WRITE_RESP;
  655. submit_write(ctx, client, client.fd);
  656. };
  657. ```
  658. Again, the read operation for io_uring behaves similarly to the read(2) syscall,
  659. so it returns the number of bytes read. If this is zero, or EOF, we can
  660. terminate the state machine and disconnect the client (this is a nominal
  661. disconnect, so we don't use disconnect_err here). If it's nonzero, we set our
  662. buffer slice to the subset of the buffer which represents the data io_uring has
  663. read.
  664. The Finger RFC requires all data to use CRLF for line endings, and this is where
  665. we deal with it. Remember earlier when I noted that we only ever used half of
  666. the read buffer? This is why: if we read 1024 newlines from the plan file, we
  667. will need another 1024 bytes to insert carriage returns. Because we've planned
  668. for and measured out our memory requirements in advance, we can use "static
  669. insert" here. This built-in works similarly to how insert normally works, but it
  670. will never re-allocate the underlying array. Instead, it asserts that the
  671. insertion would not require a re-allocation, and if it turns out that you did
  672. the math wrong, it aborts the program instead of buffer overflowing. But, we did
  673. the math and it works out, so it saves us from an extra allocation.
  674. Capping this off, we submit a write to transmit this buffer to the client.
  675. "submit_write" is quite similar to submit_read:
  676. ```hare
  677. fn submit_write(ctx: *context, client: *client, fd: int) void = {
  678. const sqe = io_uring::must_get_sqe(ctx.uring);
  679. io_uring::write(sqe, fd, client.buf: *[*]u8, len(client.buf),
  680. 0, flags::IO_LINK);
  681. io_uring::set_user(sqe, client);
  682. let ts = rt::timespec { ... };
  683. time::instant_to_timespec(client.deadline, &ts);
  684. const sqe = io_uring::must_get_sqe(ctx.uring);
  685. io_uring::link_timeout(sqe, &ts, timeout_flags::ABS);
  686. };
  687. ```
  688. Ideally, this should not require explanation. From here we transition to the
  689. WRITE_RESP state, so when the I/O completes we end up here:
  690. ```hare
  691. fn client_write_resp(
  692. ctx: *context,
  693. client: *client,
  694. cqe: *io_uring::cqe,
  695. ) (void | error) = {
  696. const r = io_uring::result(cqe)?: size;
  697. if (r < len(client.buf)) {
  698. client.buf = client.buf[r..];
  699. submit_write(ctx, client, client.fd);
  700. return;
  701. };
  702. if (client.state == state::WRITE_ERROR) {
  703. disconnect(ctx, client);
  704. return;
  705. };
  706. client.buf = client.xbuf[..0];
  707. client.state = state::READ_PLAN;
  708. submit_read(ctx, client, client.plan_fd, -1);
  709. };
  710. ```
  711. First, we check if we need to repeat this process: if we have written less than
  712. the size of the buffer, then we advance the slice by that much and submit
  713. another write.
  714. We can arrive at the next bit for two reasons: because "client.buf" includes a
  715. fragment of a plan file which has been transmitted to the client, which we just
  716. covered, or because it is the error message buffer prepared by "disconnect_err",
  717. which we discussed earlier. The dispatch function will bring us here for both
  718. the normal and error states, and we distinguish between them with this second if
  719. statement. If we're sending the plan file, we submit a read for the next
  720. buffer-ful of plan. But, our error messages always fit into one buffer, so if we
  721. ran out of buffer then we can just disconnect in the error case.
  722. And that's it! That completes our state machine, and I'm pretty sure we've read
  723. the entire program's source code by this point. Pretty neat, huh? io_uring is
  724. quite interesting. I plan on using this as a little platform upon which I can
  725. further test our io_uring implementation and develop a portable async I/O
  726. abstraction. We haven't implemented a DNS resolver for the stdlib yet, but I'll
  727. also be writing a finger client (using synchronous I/O this time) once we do.
  728. If you really wanted to max out the performance for a CLOUD SCALE WEB 8.0 XTREME
  729. PERFORMANCE finger server, we could try a few additional improvements:
  730. - Adding an internal queue for clients until we have room for their I/O in the SQ
  731. - Using a shared buffer pool with the kernel, with io_uring ops like READ_FIXED
  732. - Batching requests for the same plan file by only answering requests for it
  733. every Nth millisecond (known to some as the "data loader" pattern)
  734. - More slow loris mitigations, such as limiting open connections per IP address
  735. It would also be cool to handle SIGHUP to reload our finger group membership
  736. list without rebooting the daemon. I would say "patches welcome", but I won't
  737. share the git repo until the language is ready. And the code is GPL'd, but not
  738. AGPL'd, so you aren't entitled to it if you finger me!