415 lines
11 KiB
Elixir
415 lines
11 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Pool.Connections do
|
|
use GenServer
|
|
|
|
require Logger
|
|
|
|
@type domain :: String.t()
|
|
@type conn :: Pleroma.Gun.Conn.t()
|
|
|
|
@type t :: %__MODULE__{
|
|
conns: %{domain() => conn()},
|
|
opts: keyword()
|
|
}
|
|
|
|
defstruct conns: %{}, opts: []
|
|
|
|
alias Pleroma.Gun.API
|
|
alias Pleroma.Gun.Conn
|
|
|
|
@spec start_link({atom(), keyword()}) :: {:ok, pid()}
|
|
def start_link({name, opts}) do
|
|
GenServer.start_link(__MODULE__, opts, name: name)
|
|
end
|
|
|
|
@impl true
|
|
def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
|
|
|
|
@spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
|
|
def checkin(url, name)
|
|
def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
|
|
|
|
def checkin(%URI{} = uri, name) do
|
|
timeout = Pleroma.Config.get([:connections_pool, :receive_connection_timeout], 250)
|
|
|
|
GenServer.call(
|
|
name,
|
|
{:checkin, uri},
|
|
timeout
|
|
)
|
|
end
|
|
|
|
@spec open_conn(String.t() | URI.t(), atom(), keyword()) :: :ok
|
|
def open_conn(url, name, opts \\ [])
|
|
def open_conn(url, name, opts) when is_binary(url), do: open_conn(URI.parse(url), name, opts)
|
|
|
|
def open_conn(%URI{} = uri, name, opts) do
|
|
pool_opts = Pleroma.Config.get([:connections_pool], [])
|
|
|
|
opts =
|
|
opts
|
|
|> Enum.into(%{})
|
|
|> Map.put_new(:receive, false)
|
|
|> Map.put_new(:retry, pool_opts[:retry] || 5)
|
|
|> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 100)
|
|
|> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
|
|
|
|
GenServer.cast(name, {:open_conn, %{opts: opts, uri: uri}})
|
|
end
|
|
|
|
@spec alive?(atom()) :: boolean()
|
|
def alive?(name) do
|
|
pid = Process.whereis(name)
|
|
if pid, do: Process.alive?(pid), else: false
|
|
end
|
|
|
|
@spec get_state(atom()) :: t()
|
|
def get_state(name) do
|
|
GenServer.call(name, :state)
|
|
end
|
|
|
|
@spec checkout(pid(), pid(), atom()) :: :ok
|
|
def checkout(conn, pid, name) do
|
|
GenServer.cast(name, {:checkout, conn, pid})
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:open_conn, %{opts: opts, uri: uri}}, state) do
|
|
Logger.debug("opening new #{compose_uri(uri)}")
|
|
max_connections = state.opts[:max_connections]
|
|
|
|
key = compose_key(uri)
|
|
|
|
if Enum.count(state.conns) < max_connections do
|
|
open_conn(key, uri, state, opts)
|
|
else
|
|
try_to_open_conn(key, uri, state, opts)
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast({:checkout, conn_pid, pid}, state) do
|
|
Logger.debug("checkout #{inspect(conn_pid)}")
|
|
|
|
state =
|
|
with true <- Process.alive?(conn_pid),
|
|
{key, conn} <- find_conn(state.conns, conn_pid),
|
|
used_by <- List.keydelete(conn.used_by, pid, 0) do
|
|
conn_state =
|
|
if used_by == [] do
|
|
:idle
|
|
else
|
|
conn.conn_state
|
|
end
|
|
|
|
put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
|
|
else
|
|
false ->
|
|
Logger.warn("checkout for closed conn #{inspect(conn_pid)}")
|
|
state
|
|
|
|
nil ->
|
|
Logger.info("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
|
|
state
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call({:checkin, uri}, from, state) do
|
|
Logger.debug("checkin #{compose_uri(uri)}")
|
|
key = compose_key(uri)
|
|
|
|
case state.conns[key] do
|
|
%{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
|
|
Logger.debug("reusing conn #{compose_uri(uri)}")
|
|
|
|
with time <- :os.system_time(:second),
|
|
last_reference <- time - current_conn.last_reference,
|
|
current_crf <- crf(last_reference, 100, current_conn.crf),
|
|
state <-
|
|
put_in(state.conns[key], %{
|
|
current_conn
|
|
| last_reference: time,
|
|
crf: current_crf,
|
|
conn_state: :active,
|
|
used_by: [from | current_conn.used_by]
|
|
}) do
|
|
{:reply, conn, state}
|
|
end
|
|
|
|
%{gun_state: gun_state} when gun_state == :down ->
|
|
{:reply, nil, state}
|
|
|
|
nil ->
|
|
{:reply, nil, state}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_call(:state, _from, state), do: {:reply, state, state}
|
|
|
|
@impl true
|
|
def handle_info({:gun_up, conn_pid, _protocol}, state) do
|
|
state =
|
|
with true <- Process.alive?(conn_pid),
|
|
conn_key when is_binary(conn_key) <- compose_key_gun_info(conn_pid),
|
|
{key, conn} <- find_conn(state.conns, conn_pid, conn_key),
|
|
time <- :os.system_time(:second),
|
|
last_reference <- time - conn.last_reference,
|
|
current_crf <- crf(last_reference, 100, conn.crf) do
|
|
put_in(state.conns[key], %{
|
|
conn
|
|
| gun_state: :up,
|
|
last_reference: time,
|
|
crf: current_crf,
|
|
conn_state: :active,
|
|
retries: 0
|
|
})
|
|
else
|
|
:error_gun_info ->
|
|
Logger.warn(":gun.info caused error")
|
|
state
|
|
|
|
false ->
|
|
Logger.warn(":gun_up message for closed conn #{inspect(conn_pid)}")
|
|
state
|
|
|
|
nil ->
|
|
Logger.warn(
|
|
":gun_up message for alive conn #{inspect(conn_pid)}, but deleted from state"
|
|
)
|
|
|
|
:ok = API.close(conn_pid)
|
|
|
|
state
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
|
|
# we can't get info on this pid, because pid is dead
|
|
state =
|
|
with true <- Process.alive?(conn_pid),
|
|
{key, conn} <- find_conn(state.conns, conn_pid) do
|
|
if conn.retries == 5 do
|
|
Logger.debug("closing conn if retries is eq 5 #{inspect(conn_pid)}")
|
|
:ok = API.close(conn.conn)
|
|
|
|
put_in(
|
|
state.conns,
|
|
Map.delete(state.conns, key)
|
|
)
|
|
else
|
|
put_in(state.conns[key], %{
|
|
conn
|
|
| gun_state: :down,
|
|
retries: conn.retries + 1
|
|
})
|
|
end
|
|
else
|
|
false ->
|
|
# gun can send gun_down for closed conn, maybe connection is not closed yet
|
|
Logger.warn(":gun_down message for closed conn #{inspect(conn_pid)}")
|
|
state
|
|
|
|
nil ->
|
|
Logger.warn(
|
|
":gun_down message for alive conn #{inspect(conn_pid)}, but deleted from state"
|
|
)
|
|
|
|
:ok = API.close(conn_pid)
|
|
|
|
state
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
defp compose_key(%URI{scheme: scheme, host: host, port: port}), do: "#{scheme}:#{host}:#{port}"
|
|
|
|
defp compose_key_gun_info(pid) do
|
|
try do
|
|
# sometimes :gun.info can raise MatchError, which lead to pool terminate
|
|
%{origin_host: origin_host, origin_scheme: scheme, origin_port: port} = API.info(pid)
|
|
|
|
host =
|
|
case :inet.ntoa(origin_host) do
|
|
{:error, :einval} -> origin_host
|
|
ip -> ip
|
|
end
|
|
|
|
"#{scheme}:#{host}:#{port}"
|
|
rescue
|
|
_ -> :error_gun_info
|
|
end
|
|
end
|
|
|
|
defp find_conn(conns, conn_pid) do
|
|
Enum.find(conns, fn {_key, conn} ->
|
|
conn.conn == conn_pid
|
|
end)
|
|
end
|
|
|
|
defp find_conn(conns, conn_pid, conn_key) do
|
|
Enum.find(conns, fn {key, conn} ->
|
|
key == conn_key and conn.conn == conn_pid
|
|
end)
|
|
end
|
|
|
|
defp open_conn(key, uri, state, %{proxy: {proxy_host, proxy_port}} = opts) do
|
|
connect_opts =
|
|
uri
|
|
|> destination_opts()
|
|
|> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
|
|
|
|
with open_opts <- Map.delete(opts, :tls_opts),
|
|
{:ok, conn} <- API.open(proxy_host, proxy_port, open_opts),
|
|
{:ok, _} <- API.await_up(conn),
|
|
stream <- API.connect(conn, connect_opts),
|
|
{:response, :fin, 200, _} <- API.await(conn, stream),
|
|
state <-
|
|
put_in(state.conns[key], %Conn{
|
|
conn: conn,
|
|
gun_state: :up,
|
|
conn_state: :active,
|
|
last_reference: :os.system_time(:second)
|
|
}) do
|
|
{:noreply, state}
|
|
else
|
|
error ->
|
|
Logger.warn(
|
|
"Received error on opening connection with http proxy #{uri.scheme}://#{
|
|
compose_uri(uri)
|
|
}: #{inspect(error)}"
|
|
)
|
|
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
defp open_conn(key, uri, state, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
|
|
version =
|
|
proxy_type
|
|
|> to_string()
|
|
|> String.last()
|
|
|> case do
|
|
"4" -> 4
|
|
_ -> 5
|
|
end
|
|
|
|
socks_opts =
|
|
uri
|
|
|> destination_opts()
|
|
|> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
|
|
|> Map.put(:version, version)
|
|
|
|
opts =
|
|
opts
|
|
|> Map.put(:protocols, [:socks])
|
|
|> Map.put(:socks_opts, socks_opts)
|
|
|
|
with {:ok, conn} <- API.open(proxy_host, proxy_port, opts),
|
|
{:ok, _} <- API.await_up(conn),
|
|
state <-
|
|
put_in(state.conns[key], %Conn{
|
|
conn: conn,
|
|
gun_state: :up,
|
|
conn_state: :active,
|
|
last_reference: :os.system_time(:second)
|
|
}) do
|
|
{:noreply, state}
|
|
else
|
|
error ->
|
|
Logger.warn(
|
|
"Received error on opening connection with socks proxy #{uri.scheme}://#{
|
|
compose_uri(uri)
|
|
}: #{inspect(error)}"
|
|
)
|
|
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
defp open_conn(key, %URI{host: host, port: port} = uri, state, opts) do
|
|
Logger.debug("opening conn #{compose_uri(uri)}")
|
|
{_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
|
|
|
|
with {:ok, conn} <- API.open(host, port, opts),
|
|
{:ok, _} <- API.await_up(conn),
|
|
state <-
|
|
put_in(state.conns[key], %Conn{
|
|
conn: conn,
|
|
gun_state: :up,
|
|
conn_state: :active,
|
|
last_reference: :os.system_time(:second)
|
|
}) do
|
|
Logger.debug("new conn opened #{compose_uri(uri)}")
|
|
Logger.debug("replying to the call #{compose_uri(uri)}")
|
|
{:noreply, state}
|
|
else
|
|
error ->
|
|
Logger.warn(
|
|
"Received error on opening connection #{uri.scheme}://#{compose_uri(uri)}: #{
|
|
inspect(error)
|
|
}"
|
|
)
|
|
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
defp destination_opts(%URI{host: host, port: port}) do
|
|
{_type, host} = Pleroma.HTTP.Adapter.domain_or_ip(host)
|
|
%{host: host, port: port}
|
|
end
|
|
|
|
defp add_http2_opts(opts, "https", tls_opts) do
|
|
Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
|
|
end
|
|
|
|
defp add_http2_opts(opts, _, _), do: opts
|
|
|
|
@spec get_unused_conns(map()) :: [{domain(), conn()}]
|
|
def get_unused_conns(conns) do
|
|
conns
|
|
|> Enum.filter(fn {_k, v} ->
|
|
v.conn_state == :idle and v.used_by == []
|
|
end)
|
|
|> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
|
|
x.crf <= y.crf and x.last_reference <= y.last_reference
|
|
end)
|
|
end
|
|
|
|
defp try_to_open_conn(key, uri, state, opts) do
|
|
Logger.debug("try to open conn #{compose_uri(uri)}")
|
|
|
|
with [{close_key, least_used} | _conns] <- get_unused_conns(state.conns),
|
|
:ok <- API.close(least_used.conn),
|
|
state <-
|
|
put_in(
|
|
state.conns,
|
|
Map.delete(state.conns, close_key)
|
|
) do
|
|
Logger.debug(
|
|
"least used conn found and closed #{inspect(least_used.conn)} #{compose_uri(uri)}"
|
|
)
|
|
|
|
open_conn(key, uri, state, opts)
|
|
else
|
|
[] -> {:noreply, state}
|
|
end
|
|
end
|
|
|
|
def crf(current, steps, crf) do
|
|
1 + :math.pow(0.5, current / steps) * crf
|
|
end
|
|
|
|
def compose_uri(%URI{} = uri), do: "#{uri.host}#{uri.path}"
|
|
end
|