From 073392a3132e107f799328dde13153c3372d12ce Mon Sep 17 00:00:00 2001 From: daurnimator Date: Fri, 24 Aug 2018 02:24:21 +1000 Subject: [PATCH] http/client: WIP connection pooling --- http-scm-0.rockspec | 1 + http/client.lua | 30 +++++++++ http/client_pool.lua | 142 +++++++++++++++++++++++++++++++++++++++++++ http/request.lua | 7 ++- 4 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 http/client_pool.lua diff --git a/http-scm-0.rockspec b/http-scm-0.rockspec index 5c9dafdc..31eec879 100644 --- a/http-scm-0.rockspec +++ b/http-scm-0.rockspec @@ -30,6 +30,7 @@ build = { modules = { ["http.bit"] = "http/bit.lua"; ["http.client"] = "http/client.lua"; + ["http.client_pool"] = "http/client_pool.lua"; ["http.connection_common"] = "http/connection_common.lua"; ["http.cookie"] = "http/cookie.lua"; ["http.h1_connection"] = "http/h1_connection.lua"; diff --git a/http/client.lua b/http/client.lua index e27c2bbb..12ed6d72 100644 --- a/http/client.lua +++ b/http/client.lua @@ -6,6 +6,7 @@ local cqueues_dns = require "cqueues.dns" local cqueues_dns_record = require "cqueues.dns.record" local http_tls = require "http.tls" local http_util = require "http.util" +local http_client_pool = require "http.client_pool" local connection_common = require "http.connection_common" local onerror = connection_common.onerror local new_h1_connection = require "http.h1_connection".new @@ -155,6 +156,9 @@ local record_ipv4_mt = { __name = "http.client.record.ipv4"; __index = record_ipv4_methods; } +function record_ipv4_methods:pool_key() + return http_client_pool.ipv4_pool_key(self.addr, self.port) +end function records_methods:add_v4(addr, port) local n = self.n + 1 self[n] = setmetatable({ addr = addr, port = port }, record_ipv4_mt) @@ -168,6 +172,9 @@ local record_ipv6_mt = { __name = "http.client.record.ipv6"; __index = record_ipv6_methods; } +function record_ipv6_methods:pool_key() + return http_client_pool.ipv6_pool_key(self.addr, self.port) +end function records_methods:add_v6(addr, port) if type(addr) == "string" then -- Normalise @@ -188,6 +195,9 @@ local record_unix_mt = { __name = "http.client.record.unix"; __index = record_unix_methods; } +function record_unix_methods:pool_key() + return http_client_pool.unix_pool_key(self.path) +end function records_methods:add_unix(path) local n = self.n + 1 self[n] = setmetatable({ path = path }, record_unix_mt) @@ -253,6 +263,23 @@ local function connect(options, timeout) local records = lookup_records(options, timeout) + local pool = options.pool + if pool then + for i=1, records.n do + local dst_pool = pool[records[i]:pool_key()] + if dst_pool then + while true do + local c = http_client_pool.find_connection(dst_pool, options) + if not c then + break + end + -- TODO: if c doesn't work, try another one + return c + end + end + end + end + local bind = options.bind if bind ~= nil then assert(type(bind) == "string") @@ -301,6 +328,9 @@ local function connect(options, timeout) local ok ok, lasterr, lasterrno = c:connect(deadline and deadline-monotime()) if ok then + if pool then + pool:add(c) + end return c end c:close() diff --git a/http/client_pool.lua b/http/client_pool.lua new file mode 100644 index 00000000..4710a8e5 --- /dev/null +++ b/http/client_pool.lua @@ -0,0 +1,142 @@ +local cs = require "cqueues.socket" + +local function reuse_connection(candidate, connect_options) + -- Assume family/host/port/path already checked + + if candidate.socket == nil then + return false + end + + if connect_options.v6only then + -- TODO + return false + end + + local bind = connect_options.bind + if bind then + -- TODO: Use :localname() + return false + end + + local version = connect_options.version + if version and version ~= candidate.version then + return false + end + + if candidate.version < 2 then + -- Check if connection already in use (avoid pipelining) + if candidate.req_locked then + return false + end + elseif candidate.version == 2 then + -- Check if http2 connection is nearing end of stream ids + local highest_stream_id = math.max(candidate.highest_odd_stream, candidate.highest_even_stream) + -- The stream id is a unsigned 31bit integer. we don't reuse if it's past half way + if highest_stream_id > 0x3fffffff then + return false + end + + local h2_settings = connect_options.h2_settings + if h2_settings then + -- TODO: check (and possibly change on connection?) + return false + end + end + + -- Do TLS check last, as it is the most expensive + if connect_options.tls then + -- TODO: compare TLS parameters + return false + end + + -- Check to see if connection has been closed + local ok, err = candidate.socket:fill(1, 0) + if not ok and err == nil then + -- has been closed + return false + end + + return true +end + +local pool_methods = {} +local pool_mt = { + __name = "http.client.pool"; + __index = pool_methods; +} + +local function new_pool() + return setmetatable({}, pool_mt) +end + +local function ipv4_pool_key(addr, port) + return string.format("%d:%s:%s", cs.AF_INET, addr, port) +end + +local function ipv6_pool_key(addr, port) + return string.format("%d:[%s]:%s", cs.AF_INET6, addr, port) +end + +local function unix_pool_key(path) + return string.format("%d:%s", cs.AF_UNIX, path) +end + +local function connection_pool_key(connection) + -- XXX: if using a proxy this may not be correct + local family, a, b = connection:peername() + if family == cs.AF_INET then + return ipv4_pool_key(a, b) + elseif family == cs.AF_INET6 then + return ipv6_pool_key(a, b) + elseif family == cs.AF_UNIX then + return unix_pool_key(a) + end +end + +function pool_methods:add(connection) + local key = connection_pool_key(connection) + if not key then + return false + end + local dst_pool = self[key] + if dst_pool == nil then + dst_pool = {} + self[key] = dst_pool + end + dst_pool[connection] = true + return true +end + +function pool_methods:remove(connection) + local key = connection_pool_key(connection) + if not key then + return true + end + local dst_pool = self[key] + if dst_pool == nil then + return true + end + dst_pool[connection] = nil + if next(dst_pool) == nil then + self[key] = nil + end + return true +end + +local function find_connection(dst_pool, connect_options) + for connection in pairs(dst_pool) do + if reuse_connection(connection, connect_options) then + return connection + end + end + return nil +end + +return { + ipv4_pool_key = ipv4_pool_key; + ipv6_pool_key = ipv6_pool_key; + unix_pool_key = unix_pool_key; + + new = new_pool; + find_connection = find_connection; +} diff --git a/http/request.lua b/http/request.lua index dbd727e5..f36a218a 100644 --- a/http/request.lua +++ b/http/request.lua @@ -4,6 +4,7 @@ local uri_patts = require "lpeg_patterns.uri" local basexx = require "basexx" local client = require "http.client" local new_headers = require "http.headers".new +local http_client_pool = require "http.client_pool" local http_cookie = require "http.cookie" local http_hsts = require "http.hsts" local http_socks = require "http.socks" @@ -17,6 +18,7 @@ local default_user_agent = string.format("%s/%s", http_version.name, http_versio local default_hsts_store = http_hsts.new_store() local default_proxies = http_proxies.new():update() local default_cookie_store = http_cookie.new_store() +local default_connection_pool = http_client_pool.new() local default_h2_settings = { ENABLE_PUSH = false; @@ -26,6 +28,7 @@ local request_methods = { hsts = default_hsts_store; proxies = default_proxies; cookie_store = default_cookie_store; + pool = default_connection_pool; is_top_level = true; site_for_cookies = nil; expect_100_timeout = 1; @@ -125,6 +128,7 @@ function request_methods:clone() hsts = rawget(self, "hsts"); proxies = rawget(self, "proxies"); cookie_store = rawget(self, "cookie_store"); + pool = rawget(self, "pool"); is_top_level = rawget(self, "is_top_level"); site_for_cookies = rawget(self, "site_for_cookies"); expect_100_timeout = rawget(self, "expect_100_timeout"); @@ -491,6 +495,7 @@ function request_methods:go(timeout) if not connection then local err, errno connection, err, errno = client.connect({ + pool = self.pool; host = host; port = port; bind = self.bind; @@ -503,8 +508,6 @@ function request_methods:go(timeout) if connection == nil then return nil, err, errno end - -- Close the connection (and free resources) when done - connection:onidle(connection.close) end local stream do