Coroutines and async calls
This guide describes how to make asynchronous calls from Rspamd plugins and rules.
Overview
Prior to 1.8.0, if you needed to perform an action involving network request (i.e. Redis query, Anti-virus scan), you had to use callback-style approach. You define callback and initiate an asynchronous request and stop the execution to allow other tasks proceed.
As soon as request is completed, callback is called.
-- define a callback
local function request_done(err, code, body)
if not err then
task:insert_result('REQUEST_DONE', 1.0, body)
end
...
end
-- initiate the request
api.start_request({
callback = request_done,
...
})
Introducing pseudo-synchronous API
Rspamd 1.8.0 introduces a new pseudo-synchronous API. Now you can write code in a typical imperative manner without blocking other tasks.
Each operation that could potentially block creates a yielding point. Consequently, the code is paused until the operation is completed (similar to blocking), and it resumes only when there is a result. Meanwhile, other tasks are processed as usual.
**Please note** that synchronous mode requires symbol to be registered with **`coro`** flag from the version 1.9 (see "full example"). local err, response = api.do_request(...)
if not err then
task:insert_result('REQUEST_DONE', 1.0, response)
end
...
API example
HTTP module
To use Sync with HTTP API, just remove callback parameter from call parameters. It returns two values:
- err
nil
orstring
containing error description if network or internal error happened - response
nil
if error happened (note: HTTP-codes are returned with corresponding codes) ortable
:- code
int
HTTP response code - content
string
Response body - headers
table
(header -> value) list of response headers
- code
Asynchronous HTTP request
{:.no_toc}
-- define a callback
local function request_done(err, code, body)
if not err then
task:insert_result('HTTP_RESPONSE' .. code, 1.0, body)
end
...
end
-- initiate the request
rspamd_http.request({
url = 'http://127.0.0.1:18080/abc',
callback = request_done,
...
})
-- standard includes local rspamd_http = require "rspamd_http" local rspamd_logger = require "rspamd_logger"
local function http_symbol(task)
-- define a callback local function request_done(err, code, body) if err then rspamd_logger.errx('http_callback error: ' .. err) task:insert_result('HTTP_ERROR', 1.0, err) else task:insert_result('HTTP_RESPONSE', 1.0, body) end end
-- initiate the request rspamd_http.request({ url = 'http://127.0.0.1:18080/abc', task = task, callback = request_done, }) end
rspamd_config:register_symbol({ name = 'SIMPLE_HTTP', score = 1.0, callback = http_symbol, })
Synchronous HTTP request
{:.no_toc}
**Please note** that synchronous mode requires symbol to be registered with **coro** flag (see "full example"). local err, response = rspamd_http.request({
url = 'http://127.0.0.1:18080/abc',
...
})
if not err then
task:insert_result('HTTP_SYNC', 1.0, response.content)
end
...
local rspamd_http = require "rspamd_http" local rspamd_logger = require "rspamd_logger"
local function http_symbol(task) -- start the request local err, response = rspamd_http.request({ url = 'http://127.0.0.1:18080' .. url, task = task, method = 'post', timeout = 1, })
rspamd_logger.errx(task, 'rspamd_http.request[done] err: %1 response:%2', err, response)
-- check response if err then rspamd_logger.errx('http error: ' .. err) task:insert_result('HTTP_ERROR', 1.0, err) else task:insert_result('HTTP_RESPONSE', 1.0, response.content) end end
rspamd_config:register_symbol({ name = 'SIMPLE_HTTP', score = 1.0, callback = http_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })
DNS module
To work with DNS properly, a new module called rspamd_dns
has been introduced, which replaces the former task:get_resolver()
calls. The new API requires explicit specification of the type of request, rather than providing a set of resolve_*
methods.
Asynchronous DNS request
{:.no_toc}
local function dns_callback(_, to_resolve, results, err)
if not err then
...
end
end
task:get_resolver():resolve_a({
name = 'rspamd.com'
callback = dns_callback,
...
})
local rspamd_dns = require "rspamd_dns" local logger = require "rspamd_logger"
local function dns_symbol(task) local function dns_cb(, to_resolve, results, err) logger.errx(task, "=%1, to_resolve=%2, results=%3, err%4", _, to_resolve, results, err) if err then task:insert_result('DNS_ERROR', 1.0, err) else task:insert_result('DNS', 1.0, tostring(results[1])) end end
task:get_resolver():resolve_a({ task = task, name = 'rspamd.com', callback = dns_cb }) end
rspamd_config:register_symbol({ name = 'SIMPLE_DNS', score = 1.0, callback = dns_symbol, })
Synchronous DNS request
{:.no_toc}
**Please note** that synchronous mode requires symbol to be registered with **coro** flag (see "full example"). local is_ok, results = rspamd_dns.request({
type = 'a',
name = to_resolve ,
...
})
if is_ok then
task:insert_result('DNS_SYNC', 1.0, tostring(results[1]))
end
local rspamd_dns = require "rspamd_dns" local logger = require "rspamd_logger"
local function dns_sync_symbol(task) local to_resolve = tostring(task:get_request_header('to-resolve')) local is_ok, results = rspamd_dns.request({ task = task, type = 'a', name = to_resolve , })
logger.errx(task, "is_ok=%1, results=%2, results[1]=%3", is_ok, results, results[1])
if not is_ok then task:insert_result('DNS_SYNC_ERROR', 1.0, results) else task:insert_result('DNS_SYNC', 1.0, tostring(results[1])) end end
rspamd_config:register_symbol({ name = 'SIMPLE_DNS_SYNC', score = 1.0, callback = dns_sync_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })
TCP module
It is recommended to use lua_tcp_sync
module to work TCP.
Asynchronous TCP request
{:.no_toc}
local function http_read_cb(err, data, conn)
task:insert_result('HTTP_ASYNC_RESPONSE', 1.0, data or err)
...
end
rspamd_tcp:request({
callback = http_read_cb,
host = '127.0.0.1',
data = {'GET /request HTTP/1.1\r\nConnection: keep-alive\r\n\r\n'},
...
})
local rspamd_tcp = require "rspamd_tcp" local logger = require "rspamd_logger"
local function http_simple_tcp_async_symbol(task) logger.errx(task, 'http_tcp_symbol: begin') local function http_read_cb(err, data, conn) logger.errx(task, 'http_read_cb: got reply: %s, error: %s, conn: %s', data, err, conn) task:insert_result('HTTP_ASYNC_RESPONSE', 1.0, data or err) -- if we want to send another request -- conn:add_write(http_read_post_cb, "POST /request2 HTTP/1.1\r\n\r\n") end rspamd_tcp:request({ task = task, callback = http_read_cb, host = '127.0.0.1', data = {'GET /request HTTP/1.1\r\nConnection: keep-alive\r\n\r\n'}, read = true, port = 18080, }) end
rspamd_config:register_symbol({ name = 'SIMPLE_TCP_ASYNC_TEST', score = 1.0, callback = http_simple_tcp_async_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })
Synchronous TCP request
{:.no_toc}
**Please note** that synchronous mode requires symbol to be registered with **coro** flag (see "full example"). local is_ok, connection = tcp_sync.connect {
host = '127.0.0.1',
...
}
if not is_ok then
logger.errx(task, 'write error: %1', connection)
end
logger.errx(task, 'connect_sync %1, %2', is_ok, tostring(connection))
is_ok, err = connection:write('GET /request_sync HTTP/1.1\r\nConnection: keep-alive\r\n\r\n')
if not is_ok then
logger.errx(task, 'write error: %1', err)
end
is_ok, data = connection:read_once();
task:insert_result('HTTP_RESPONSE', 1.0, data or err)
local logger = require "rspamd_logger" local tcp_sync = require "lua_tcp_sync"
local function http_tcp_symbol(task)
local err local is_ok, connection = tcp_sync.connect { task = task, host = '127.0.0.1', timeout = 20, port = 18080, }
logger.errx(task, 'connect_sync %1, %2', is_ok, tostring(connection)) if not is_ok then logger.errx(task, 'connect error: %1', connection) return end
is_ok, err = connection:write(string.format('GET /request_sync HTTP/1.1\r\nConnection: close\r\n\r\n'))
if not is_ok then logger.errx(task, 'write error: %1', err) return end
local content_length, content
while true do local header_line is_ok, header_line = connection:read_until("\r\n") if not is_ok then logger.errx(task, 'failed to get header: %1', header_line) return end
if header_line == "" then
logger.errx(task, 'headers done')
break
end
local value
local header = header_line:gsub("([%w-]+): (.*)",
function (h, v) value = v; return h:lower() end)
logger.errx(task, 'parsed header: %1 -> "%2"', header, value)
if header == "content-length" then
content_length = tonumber(value)
end
end
if content_length then is_ok, content = connection:read_bytes(content_length) if is_ok then task:insert_result('HTTP_SYNC_CONTENT', 1.0, content) end else is_ok, content = connection:read_until_eof() if is_ok then task:insert_result('HTTP_SYNC_EOF', 1.0, content) end end logger.errx(task, '(is_ok: %1) content [%2 bytes] %3', is_ok, content_length, content) end
rspamd_config:register_symbol({ name = 'HTTP_TCP_TEST', score = 1.0, callback = http_tcp_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })
Redis module
Asynchronous Redis request
{:.no_toc}
local function redis_cb(err, data)
if not err then
task:insert_result('REDIS_ASYNC201809_ERROR', 1.0, err)
end
...
end
local attrs = {
callback = redis_cb
...
}
local request = {...}
redis_lua.request(redis_params, attrs, request)
local logger = require "rspamd_logger" local redis_lua = require "lua_redis" local lua_util = require "lua_util" local redis_params local N = 'redis_test'
local function redis_simple_async_api(task) local function redis_cb(err, data) if err then task:insert_result('REDIS_ASYNC_ERROR', 1.0, err) else task:insert_result('REDIS_ASYNC', 1.0, data) end end
local attrs = { task = task, callback = redis_cb } local request = { 'GET', 'test_key' } redis_lua.request(redis_params, attrs, request) end
redis_params = rspamd_parse_redis_server(N)
rspamd_config:register_symbol({ name = 'SIMPLE_REDIS_ASYNC_TEST', score = 1.0, callback = redis_simple_async_api, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })
Synchronous Redis request
{:.no_toc}
**Please note** that synchronous mode requires symbol to be registered with **coro** flag (see "full example"). local is_ok, connection = redis_lua.connect(...)
if not is_ok then
return
end
is_ok, err = connection:add_cmd('EVAL', {[[return "hello from lua on redis"]], 0})
if not is_ok then
return
end
is_ok,data = connection:exec()
if is_ok then
task:insert_result('REDIS', 1.0, data)
end
...
local logger = require "rspamd_logger" local redis_lua = require "lua_redis"
local redis_params local N = 'redis_test'
local function redis_symbol(task)
local attrs = {task = task} local is_ok, connection = redis_lua.connect(redis_params, attrs)
logger.infox(task, "connect: %1, %2", is_ok, connection)
if not is_ok then task:insert_result('REDIS_ERROR', 1.0, connection) return end
local err, data
local lua_script = [[return "hello from lua on redis"]]
is_ok, err = connection:add_cmd('EVAL', {lua_script, 0}) logger.infox(task, "add_cmd: %1, %2", is_ok, err)
if not is_ok then task:insert_result('REDIS_ERROR_2', 1.0, err) return end
is_ok,data = connection:exec()
logger.infox(task, "exec: %1, %2", is_ok, data)
if not is_ok then task:insert_result('REDIS_ERROR_3', 1.0, data) return end
task:insert_result('REDIS', 1.0, data)
end
redis_params = rspamd_parse_redis_server(N)
rspamd_config:register_symbol({ name = 'REDIS_TEST', score = 1.0, callback = redis_symbol, -- Symbol using Synchronous API should have "coro" flag. flags = 'coro', })