Skip to content

Commit

Permalink
http/client: WIP connection pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
daurnimator committed Aug 23, 2018
1 parent 5a80e99 commit 073392a
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 2 deletions.
1 change: 1 addition & 0 deletions http-scm-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ build = {
modules = {
["http.bit"] = "http/bit.lua";
["http.client"] = "http/client.lua";
["http.client_pool"] = "http/client_pool.lua";
["http.connection_common"] = "http/connection_common.lua";
["http.cookie"] = "http/cookie.lua";
["http.h1_connection"] = "http/h1_connection.lua";
Expand Down
30 changes: 30 additions & 0 deletions http/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local cqueues_dns = require "cqueues.dns"
local cqueues_dns_record = require "cqueues.dns.record"
local http_tls = require "http.tls"
local http_util = require "http.util"
local http_client_pool = require "http.client_pool"
local connection_common = require "http.connection_common"
local onerror = connection_common.onerror
local new_h1_connection = require "http.h1_connection".new
Expand Down Expand Up @@ -155,6 +156,9 @@ local record_ipv4_mt = {
__name = "http.client.record.ipv4";
__index = record_ipv4_methods;
}
function record_ipv4_methods:pool_key()
return http_client_pool.ipv4_pool_key(self.addr, self.port)
end
function records_methods:add_v4(addr, port)
local n = self.n + 1
self[n] = setmetatable({ addr = addr, port = port }, record_ipv4_mt)
Expand All @@ -168,6 +172,9 @@ local record_ipv6_mt = {
__name = "http.client.record.ipv6";
__index = record_ipv6_methods;
}
function record_ipv6_methods:pool_key()
return http_client_pool.ipv6_pool_key(self.addr, self.port)
end
function records_methods:add_v6(addr, port)
if type(addr) == "string" then
-- Normalise
Expand All @@ -188,6 +195,9 @@ local record_unix_mt = {
__name = "http.client.record.unix";
__index = record_unix_methods;
}
function record_unix_methods:pool_key()
return http_client_pool.unix_pool_key(self.path)
end
function records_methods:add_unix(path)
local n = self.n + 1
self[n] = setmetatable({ path = path }, record_unix_mt)
Expand Down Expand Up @@ -253,6 +263,23 @@ local function connect(options, timeout)

local records = lookup_records(options, timeout)

local pool = options.pool
if pool then
for i=1, records.n do
local dst_pool = pool[records[i]:pool_key()]
if dst_pool then
while true do
local c = http_client_pool.find_connection(dst_pool, options)
if not c then
break
end
-- TODO: if c doesn't work, try another one
return c
end
end
end
end

local bind = options.bind
if bind ~= nil then
assert(type(bind) == "string")
Expand Down Expand Up @@ -301,6 +328,9 @@ local function connect(options, timeout)
local ok
ok, lasterr, lasterrno = c:connect(deadline and deadline-monotime())
if ok then
if pool then
pool:add(c)
end
return c
end
c:close()
Expand Down
142 changes: 142 additions & 0 deletions http/client_pool.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
local cs = require "cqueues.socket"

local function reuse_connection(candidate, connect_options)
-- Assume family/host/port/path already checked

if candidate.socket == nil then
return false
end

if connect_options.v6only then
-- TODO
return false
end

local bind = connect_options.bind
if bind then
-- TODO: Use :localname()
return false
end

local version = connect_options.version
if version and version ~= candidate.version then
return false
end

if candidate.version < 2 then
-- Check if connection already in use (avoid pipelining)
if candidate.req_locked then
return false
end
elseif candidate.version == 2 then
-- Check if http2 connection is nearing end of stream ids
local highest_stream_id = math.max(candidate.highest_odd_stream, candidate.highest_even_stream)
-- The stream id is a unsigned 31bit integer. we don't reuse if it's past half way
if highest_stream_id > 0x3fffffff then
return false
end

local h2_settings = connect_options.h2_settings
if h2_settings then
-- TODO: check (and possibly change on connection?)
return false
end
end

-- Do TLS check last, as it is the most expensive
if connect_options.tls then
-- TODO: compare TLS parameters
return false
end

-- Check to see if connection has been closed
local ok, err = candidate.socket:fill(1, 0)
if not ok and err == nil then
-- has been closed
return false
end

return true
end

local pool_methods = {}
local pool_mt = {
__name = "http.client.pool";
__index = pool_methods;
}

local function new_pool()
return setmetatable({}, pool_mt)
end

local function ipv4_pool_key(addr, port)
return string.format("%d:%s:%s", cs.AF_INET, addr, port)
end

local function ipv6_pool_key(addr, port)
return string.format("%d:[%s]:%s", cs.AF_INET6, addr, port)
end

local function unix_pool_key(path)
return string.format("%d:%s", cs.AF_UNIX, path)
end

local function connection_pool_key(connection)
-- XXX: if using a proxy this may not be correct
local family, a, b = connection:peername()
if family == cs.AF_INET then
return ipv4_pool_key(a, b)
elseif family == cs.AF_INET6 then
return ipv6_pool_key(a, b)
elseif family == cs.AF_UNIX then
return unix_pool_key(a)
end
end

function pool_methods:add(connection)
local key = connection_pool_key(connection)
if not key then
return false
end
local dst_pool = self[key]
if dst_pool == nil then
dst_pool = {}
self[key] = dst_pool
end
dst_pool[connection] = true
return true
end

function pool_methods:remove(connection)
local key = connection_pool_key(connection)
if not key then
return true
end
local dst_pool = self[key]
if dst_pool == nil then
return true
end
dst_pool[connection] = nil
if next(dst_pool) == nil then
self[key] = nil
end
return true
end

local function find_connection(dst_pool, connect_options)
for connection in pairs(dst_pool) do
if reuse_connection(connection, connect_options) then
return connection
end
end
return nil
end

return {
ipv4_pool_key = ipv4_pool_key;
ipv6_pool_key = ipv6_pool_key;
unix_pool_key = unix_pool_key;

new = new_pool;
find_connection = find_connection;
}
7 changes: 5 additions & 2 deletions http/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local uri_patts = require "lpeg_patterns.uri"
local basexx = require "basexx"
local client = require "http.client"
local new_headers = require "http.headers".new
local http_client_pool = require "http.client_pool"
local http_cookie = require "http.cookie"
local http_hsts = require "http.hsts"
local http_socks = require "http.socks"
Expand All @@ -17,6 +18,7 @@ local default_user_agent = string.format("%s/%s", http_version.name, http_versio
local default_hsts_store = http_hsts.new_store()
local default_proxies = http_proxies.new():update()
local default_cookie_store = http_cookie.new_store()
local default_connection_pool = http_client_pool.new()

local default_h2_settings = {
ENABLE_PUSH = false;
Expand All @@ -26,6 +28,7 @@ local request_methods = {
hsts = default_hsts_store;
proxies = default_proxies;
cookie_store = default_cookie_store;
pool = default_connection_pool;
is_top_level = true;
site_for_cookies = nil;
expect_100_timeout = 1;
Expand Down Expand Up @@ -125,6 +128,7 @@ function request_methods:clone()
hsts = rawget(self, "hsts");
proxies = rawget(self, "proxies");
cookie_store = rawget(self, "cookie_store");
pool = rawget(self, "pool");
is_top_level = rawget(self, "is_top_level");
site_for_cookies = rawget(self, "site_for_cookies");
expect_100_timeout = rawget(self, "expect_100_timeout");
Expand Down Expand Up @@ -491,6 +495,7 @@ function request_methods:go(timeout)
if not connection then
local err, errno
connection, err, errno = client.connect({
pool = self.pool;
host = host;
port = port;
bind = self.bind;
Expand All @@ -503,8 +508,6 @@ function request_methods:go(timeout)
if connection == nil then
return nil, err, errno
end
-- Close the connection (and free resources) when done
connection:onidle(connection.close)
end

local stream do
Expand Down

0 comments on commit 073392a

Please sign in to comment.