Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client connection pooling #121

Open
wants to merge 2 commits into
base: client.dns_resolver
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions http-scm-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ build = {
modules = {
["http.bit"] = "http/bit.lua";
["http.client"] = "http/client.lua";
["http.client_pool"] = "http/client_pool.lua";
["http.connection_common"] = "http/connection_common.lua";
["http.cookie"] = "http/cookie.lua";
["http.h1_connection"] = "http/h1_connection.lua";
Expand Down
32 changes: 31 additions & 1 deletion http/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local cqueues_dns = require "cqueues.dns"
local cqueues_dns_record = require "cqueues.dns.record"
local http_tls = require "http.tls"
local http_util = require "http.util"
local http_client_pool = require "http.client_pool"
local connection_common = require "http.connection_common"
local onerror = connection_common.onerror
local new_h1_connection = require "http.h1_connection".new
Expand Down Expand Up @@ -166,6 +167,9 @@ local record_ipv4_mt = {
__name = "http.client.record.ipv4";
__index = record_ipv4_methods;
}
function record_ipv4_methods:pool_key()
return http_client_pool.ipv4_pool_key(self.addr, self.port)
end
function records_methods:add_v4(addr, port)
local n = self.n + 1
self[n] = setmetatable({ addr = addr, port = port }, record_ipv4_mt)
Expand All @@ -179,6 +183,9 @@ local record_ipv6_mt = {
__name = "http.client.record.ipv6";
__index = record_ipv6_methods;
}
function record_ipv6_methods:pool_key()
return http_client_pool.ipv6_pool_key(self.addr, self.port)
end
function records_methods:add_v6(addr, port)
if type(addr) == "string" then
-- Normalise
Expand All @@ -199,6 +206,9 @@ local record_unix_mt = {
__name = "http.client.record.unix";
__index = record_unix_methods;
}
function record_unix_methods:pool_key()
return http_client_pool.unix_pool_key(self.path)
end
function records_methods:add_unix(path)
local n = self.n + 1
self[n] = setmetatable({ path = path }, record_unix_mt)
Expand Down Expand Up @@ -273,6 +283,24 @@ local function connect(options, timeout)

local records = lookup_records(options, timeout)

local lasterr, lasterrno = "The name does not resolve for the supplied parameters", nil
if records.n == 0 then
return nil, lasterr, lasterrno
end

local pool = options.pool
if pool then
for i=1, records.n do
local dst_pool = pool[records[i]:pool_key()]
if dst_pool then
local c = http_client_pool.find_connection(dst_pool, options)
if c then
return c
end
end
end
end

local bind = options.bind
if bind ~= nil then
assert(type(bind) == "string")
Expand Down Expand Up @@ -303,7 +331,6 @@ local function connect(options, timeout)
nodelay = true;
}

local lasterr, lasterrno = "The name does not resolve for the supplied parameters"
local i = 1
while i <= records.n do
local rec = records[i]
Expand All @@ -321,6 +348,9 @@ local function connect(options, timeout)
local ok
ok, lasterr, lasterrno = c:connect(deadline and deadline-monotime())
if ok then
if pool then
pool:add(c)
end
return c
end
c:close()
Expand Down
142 changes: 142 additions & 0 deletions http/client_pool.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
local cs = require "cqueues.socket"

local function reuse_connection(candidate, connect_options)
-- Assume family/host/port/path already checked

if candidate.socket == nil then
return false
end

if connect_options.v6only then
-- TODO
return false
end

local bind = connect_options.bind
if bind then
-- TODO: Use :localname()
return false
end

local version = connect_options.version
if version and version ~= candidate.version then
return false
end

if candidate.version < 2 then
-- Check if connection already in use (avoid pipelining)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: check pipeline depth instead.
Also, might want to find shortest pipeline in the pool?

if candidate.req_locked then
return false
end
elseif candidate.version == 2 then
-- Check if http2 connection is nearing end of stream ids
local highest_stream_id = math.max(candidate.highest_odd_stream, candidate.highest_even_stream)
-- The stream id is a unsigned 31bit integer. we don't reuse if it's past half way
if highest_stream_id > 0x3fffffff then
return false
end

local h2_settings = connect_options.h2_settings
if h2_settings then
-- TODO: check (and possibly change on connection?)
return false
end
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should check MAX_CONCURRENT_STREAMS

end

-- Do TLS check last, as it is the most expensive
if connect_options.tls then
-- TODO: compare TLS parameters
return false
end

-- Check to see if connection has been closed
local ok, err = candidate.socket:fill(1, 0)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Factor out into a "is alive" function. Can be reused for cleaning the pool

if not ok and err == nil then
-- has been closed
return false
end

return true
end

local pool_methods = {}
local pool_mt = {
__name = "http.client.pool";
__index = pool_methods;
}

local function new_pool()
return setmetatable({}, pool_mt)
end

local function ipv4_pool_key(addr, port)
return string.format("%d:%s:%s", cs.AF_INET, addr, port)
end

local function ipv6_pool_key(addr, port)
return string.format("%d:[%s]:%s", cs.AF_INET6, addr, port)
end

local function unix_pool_key(path)
return string.format("%d:%s", cs.AF_UNIX, path)
end

local function connection_pool_key(connection)
-- XXX: if using a proxy this may not be correct
local family, a, b = connection:peername()
if family == cs.AF_INET then
return ipv4_pool_key(a, b)
elseif family == cs.AF_INET6 then
return ipv6_pool_key(a, b)
elseif family == cs.AF_UNIX then
return unix_pool_key(a)
end
end

function pool_methods:add(connection)
local key = connection_pool_key(connection)
if not key then
return false
end
local dst_pool = self[key]
if dst_pool == nil then
dst_pool = {}
self[key] = dst_pool
end
dst_pool[connection] = true
return true
end

function pool_methods:remove(connection)
local key = connection_pool_key(connection)
if not key then
return true
end
local dst_pool = self[key]
if dst_pool == nil then
return true
end
dst_pool[connection] = nil
if next(dst_pool) == nil then
self[key] = nil
end
return true
end

local function find_connection(dst_pool, connect_options)
for connection in pairs(dst_pool) do
if reuse_connection(connection, connect_options) then
return connection
end
end
return nil
end

return {
ipv4_pool_key = ipv4_pool_key;
ipv6_pool_key = ipv6_pool_key;
unix_pool_key = unix_pool_key;

new = new_pool;
find_connection = find_connection;
}
7 changes: 5 additions & 2 deletions http/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local uri_patts = require "lpeg_patterns.uri"
local basexx = require "basexx"
local client = require "http.client"
local new_headers = require "http.headers".new
local http_client_pool = require "http.client_pool"
local http_cookie = require "http.cookie"
local http_hsts = require "http.hsts"
local http_socks = require "http.socks"
Expand All @@ -17,6 +18,7 @@ local default_user_agent = string.format("%s/%s", http_version.name, http_versio
local default_hsts_store = http_hsts.new_store()
local default_proxies = http_proxies.new():update()
local default_cookie_store = http_cookie.new_store()
local default_connection_pool = http_client_pool.new()

local default_h2_settings = {
ENABLE_PUSH = false;
Expand All @@ -26,6 +28,7 @@ local request_methods = {
hsts = default_hsts_store;
proxies = default_proxies;
cookie_store = default_cookie_store;
pool = default_connection_pool;
is_top_level = true;
site_for_cookies = nil;
expect_100_timeout = 1;
Expand Down Expand Up @@ -125,6 +128,7 @@ function request_methods:clone()
hsts = rawget(self, "hsts");
proxies = rawget(self, "proxies");
cookie_store = rawget(self, "cookie_store");
pool = rawget(self, "pool");
is_top_level = rawget(self, "is_top_level");
site_for_cookies = rawget(self, "site_for_cookies");
expect_100_timeout = rawget(self, "expect_100_timeout");
Expand Down Expand Up @@ -490,6 +494,7 @@ function request_methods:go(timeout)
if not connection then
local err, errno
connection, err, errno = client.connect({
pool = self.pool;
host = host;
port = port;
bind = self.bind;
Expand All @@ -502,8 +507,6 @@ function request_methods:go(timeout)
if connection == nil then
return nil, err, errno
end
-- Close the connection (and free resources) when done
connection:onidle(connection.close)
end

local stream do
Expand Down