Skip to main content

Coroutines and async calls

This guide describes how to make asynchronous calls from Rspamd plugins and rules.

Overview

Prior to 1.8.0, if you needed to perform an action involving network request (i.e. Redis query, Anti-virus scan), you had to use callback-style approach. You define callback and initiate an asynchronous request and stop the execution to allow other tasks proceed.

As soon as request is completed, callback is called.

  -- define a callback
local function request_done(err, code, body)
if not err then
task:insert_result('REQUEST_DONE', 1.0, body)
end
...
end

-- initiate the request
api.start_request({
callback = request_done,
...
})

Introducing pseudo-synchronous API

Rspamd 1.8.0 introduces a new pseudo-synchronous API. Now you can write code in a typical imperative manner without blocking other tasks.

Each operation that could potentially block creates a yielding point. Consequently, the code is paused until the operation is completed (similar to blocking), and it resumes only when there is a result. Meanwhile, other tasks are processed as usual.

**Please note** that synchronous mode requires symbol to be registered with **`coro`** flag from the version 1.9 (see "full example").
  local err, response = api.do_request(...)

if not err then
task:insert_result('REQUEST_DONE', 1.0, response)
end
...

API example

HTTP module

To use Sync with HTTP API, just remove callback parameter from call parameters. It returns two values:

  • err nil or string containing error description if network or internal error happened
  • response nil if error happened (note: HTTP-codes are returned with corresponding codes) or table:
    • code int HTTP response code
    • content string Response body
    • headers table (header -> value) list of response headers

Asynchronous HTTP request

{:.no_toc}

  -- define a callback
local function request_done(err, code, body)
if not err then
task:insert_result('HTTP_RESPONSE' .. code, 1.0, body)
end
...
end

-- initiate the request
rspamd_http.request({
url = 'http://127.0.0.1:18080/abc',
callback = request_done,
...
})

-- standard includes local rspamd_http = require "rspamd_http" local rspamd_logger = require "rspamd_logger"

local function http_symbol(task)

-- define a callback local function request_done(err, code, body) if err then rspamd_logger.errx('http_callback error: ' .. err) task:insert_result('HTTP_ERROR', 1.0, err) else task:insert_result('HTTP_RESPONSE', 1.0, body) end end

-- initiate the request rspamd_http.request({ url = 'http://127.0.0.1:18080/abc', task = task, callback = request_done, }) end

rspamd_config:register_symbol({ name = 'SIMPLE_HTTP', score = 1.0, callback = http_symbol, })

Synchronous HTTP request

{:.no_toc}

**Please note** that synchronous mode requires symbol to be registered with **coro** flag (see "full example").
  local err, response = rspamd_http.request({
url = 'http://127.0.0.1:18080/abc',
...
})

if not err then
task:insert_result('HTTP_SYNC', 1.0, response.content)
end
...

local rspamd_http = require "rspamd_http" local rspamd_logger = require "rspamd_logger"

local function http_symbol(task) -- start the request local err, response = rspamd_http.request({ url = 'http://127.0.0.1:18080' .. url, task = task, method = 'post', timeout = 1, })

rspamd_logger.errx(task, 'rspamd_http.request[done] err: %1 response:%2', err, response)

-- check response if err then rspamd_logger.errx('http error: ' .. err) task:insert_result('HTTP_ERROR', 1.0, err) else task:insert_result('HTTP_RESPONSE', 1.0, response.content) end end

rspamd_config:register_symbol({ name = 'SIMPLE_HTTP', score = 1.0, callback = http_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })

DNS module

To work with DNS properly, a new module called rspamd_dns has been introduced, which replaces the former task:get_resolver() calls. The new API requires explicit specification of the type of request, rather than providing a set of resolve_* methods.

Asynchronous DNS request

{:.no_toc}

local function dns_callback(_, to_resolve, results, err)
if not err then
...
end
end

task:get_resolver():resolve_a({
name = 'rspamd.com'
callback = dns_callback,
...
})

local rspamd_dns = require "rspamd_dns" local logger = require "rspamd_logger"

local function dns_symbol(task) local function dns_cb(, to_resolve, results, err) logger.errx(task, "=%1, to_resolve=%2, results=%3, err%4", _, to_resolve, results, err) if err then task:insert_result('DNS_ERROR', 1.0, err) else task:insert_result('DNS', 1.0, tostring(results[1])) end end

task:get_resolver():resolve_a({ task = task, name = 'rspamd.com', callback = dns_cb }) end

rspamd_config:register_symbol({ name = 'SIMPLE_DNS', score = 1.0, callback = dns_symbol, })

Synchronous DNS request

{:.no_toc}

**Please note** that synchronous mode requires symbol to be registered with **coro** flag (see "full example").
  local is_ok, results = rspamd_dns.request({
type = 'a',
name = to_resolve ,
...
})
if is_ok then
task:insert_result('DNS_SYNC', 1.0, tostring(results[1]))
end

local rspamd_dns = require "rspamd_dns" local logger = require "rspamd_logger"

local function dns_sync_symbol(task) local to_resolve = tostring(task:get_request_header('to-resolve')) local is_ok, results = rspamd_dns.request({ task = task, type = 'a', name = to_resolve , })

logger.errx(task, "is_ok=%1, results=%2, results[1]=%3", is_ok, results, results[1])

if not is_ok then task:insert_result('DNS_SYNC_ERROR', 1.0, results) else task:insert_result('DNS_SYNC', 1.0, tostring(results[1])) end end

rspamd_config:register_symbol({ name = 'SIMPLE_DNS_SYNC', score = 1.0, callback = dns_sync_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })

TCP module

It is recommended to use lua_tcp_sync module to work TCP.

Asynchronous TCP request

{:.no_toc}

  local function http_read_cb(err, data, conn)
task:insert_result('HTTP_ASYNC_RESPONSE', 1.0, data or err)
...
end
rspamd_tcp:request({
callback = http_read_cb,
host = '127.0.0.1',
data = {'GET /request HTTP/1.1\r\nConnection: keep-alive\r\n\r\n'},
...
})

local rspamd_tcp = require "rspamd_tcp" local logger = require "rspamd_logger"

local function http_simple_tcp_async_symbol(task) logger.errx(task, 'http_tcp_symbol: begin') local function http_read_cb(err, data, conn) logger.errx(task, 'http_read_cb: got reply: %s, error: %s, conn: %s', data, err, conn) task:insert_result('HTTP_ASYNC_RESPONSE', 1.0, data or err) -- if we want to send another request -- conn:add_write(http_read_post_cb, "POST /request2 HTTP/1.1\r\n\r\n") end rspamd_tcp:request({ task = task, callback = http_read_cb, host = '127.0.0.1', data = {'GET /request HTTP/1.1\r\nConnection: keep-alive\r\n\r\n'}, read = true, port = 18080, }) end

rspamd_config:register_symbol({ name = 'SIMPLE_TCP_ASYNC_TEST', score = 1.0, callback = http_simple_tcp_async_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })

Synchronous TCP request

{:.no_toc}

**Please note** that synchronous mode requires symbol to be registered with **coro** flag (see "full example").
  local is_ok, connection = tcp_sync.connect {
host = '127.0.0.1',
...
}

if not is_ok then
logger.errx(task, 'write error: %1', connection)
end

logger.errx(task, 'connect_sync %1, %2', is_ok, tostring(connection))

is_ok, err = connection:write('GET /request_sync HTTP/1.1\r\nConnection: keep-alive\r\n\r\n')
if not is_ok then
logger.errx(task, 'write error: %1', err)
end

is_ok, data = connection:read_once();
task:insert_result('HTTP_RESPONSE', 1.0, data or err)

local logger = require "rspamd_logger" local tcp_sync = require "lua_tcp_sync"

local function http_tcp_symbol(task)

local err local is_ok, connection = tcp_sync.connect { task = task, host = '127.0.0.1', timeout = 20, port = 18080, }

logger.errx(task, 'connect_sync %1, %2', is_ok, tostring(connection)) if not is_ok then logger.errx(task, 'connect error: %1', connection) return end

is_ok, err = connection:write(string.format('GET /request_sync HTTP/1.1\r\nConnection: close\r\n\r\n'))

if not is_ok then logger.errx(task, 'write error: %1', err) return end

local content_length, content

while true do local header_line is_ok, header_line = connection:read_until("\r\n") if not is_ok then logger.errx(task, 'failed to get header: %1', header_line) return end

if header_line == "" then
logger.errx(task, 'headers done')
break
end

local value
local header = header_line:gsub("([%w-]+): (.*)",
function (h, v) value = v; return h:lower() end)

logger.errx(task, 'parsed header: %1 -> "%2"', header, value)

if header == "content-length" then
content_length = tonumber(value)
end

end

if content_length then is_ok, content = connection:read_bytes(content_length) if is_ok then task:insert_result('HTTP_SYNC_CONTENT', 1.0, content) end else is_ok, content = connection:read_until_eof() if is_ok then task:insert_result('HTTP_SYNC_EOF', 1.0, content) end end logger.errx(task, '(is_ok: %1) content [%2 bytes] %3', is_ok, content_length, content) end

rspamd_config:register_symbol({ name = 'HTTP_TCP_TEST', score = 1.0, callback = http_tcp_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })

Redis module

Asynchronous Redis request

{:.no_toc}

  local function redis_cb(err, data)
if not err then
task:insert_result('REDIS_ASYNC201809_ERROR', 1.0, err)
end
...
end

local attrs = {
callback = redis_cb
...
}
local request = {...}
redis_lua.request(redis_params, attrs, request)

local logger = require "rspamd_logger" local redis_lua = require "lua_redis" local lua_util = require "lua_util" local redis_params local N = 'redis_test'

local function redis_simple_async_api(task) local function redis_cb(err, data) if err then task:insert_result('REDIS_ASYNC_ERROR', 1.0, err) else task:insert_result('REDIS_ASYNC', 1.0, data) end end

local attrs = { task = task, callback = redis_cb } local request = { 'GET', 'test_key' } redis_lua.request(redis_params, attrs, request) end

redis_params = rspamd_parse_redis_server(N)

rspamd_config:register_symbol({ name = 'SIMPLE_REDIS_ASYNC_TEST', score = 1.0, callback = redis_simple_async_api, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })

Synchronous Redis request

{:.no_toc}

**Please note** that synchronous mode requires symbol to be registered with **coro** flag (see "full example").
  local is_ok, connection = redis_lua.connect(...)
if not is_ok then
return
end

is_ok, err = connection:add_cmd('EVAL', {[[return "hello from lua on redis"]], 0})

if not is_ok then
return
end

is_ok,data = connection:exec()
if is_ok then
task:insert_result('REDIS', 1.0, data)
end
...

local logger = require "rspamd_logger" local redis_lua = require "lua_redis"

local redis_params local N = 'redis_test'

local function redis_symbol(task)

local attrs = {task = task} local is_ok, connection = redis_lua.connect(redis_params, attrs)

logger.infox(task, "connect: %1, %2", is_ok, connection)

if not is_ok then task:insert_result('REDIS_ERROR', 1.0, connection) return end

local err, data

local lua_script = [[return "hello from lua on redis"]]

is_ok, err = connection:add_cmd('EVAL', {lua_script, 0}) logger.infox(task, "add_cmd: %1, %2", is_ok, err)

if not is_ok then task:insert_result('REDIS_ERROR_2', 1.0, err) return end

is_ok,data = connection:exec()

logger.infox(task, "exec: %1, %2", is_ok, data)

if not is_ok then task:insert_result('REDIS_ERROR_3', 1.0, data) return end

task:insert_result('REDIS', 1.0, data)

end

redis_params = rspamd_parse_redis_server(N)

rspamd_config:register_symbol({ name = 'REDIS_TEST', score = 1.0, callback = redis_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })