mirror of
https://github.com/AikidoSec/safe-chain.git
synced 2026-05-26 20:20:49 +00:00
Decompose server
This commit is contained in:
parent
dfec771fe3
commit
a55d8217e6
3 changed files with 117 additions and 113 deletions
|
|
@ -1,37 +1,11 @@
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use rama::{
|
use rama::telemetry::tracing::{
|
||||||
extensions::ExtensionsMut,
|
self,
|
||||||
http::{
|
metadata::LevelFilter,
|
||||||
client::EasyHttpWebClient,
|
subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter},
|
||||||
layer::{
|
|
||||||
remove_header::{RemoveRequestHeaderLayer, RemoveResponseHeaderLayer},
|
|
||||||
trace::TraceLayer,
|
|
||||||
upgrade::UpgradeLayer,
|
|
||||||
},
|
|
||||||
matcher::MethodMatcher,
|
|
||||||
server::HttpServer,
|
|
||||||
service::web::response::IntoResponse,
|
|
||||||
Request, Response, StatusCode,
|
|
||||||
},
|
|
||||||
layer::ConsumeErrLayer,
|
|
||||||
net::{http::RequestContext, proxy::ProxyTarget, stream::layer::http::BodyLimitLayer},
|
|
||||||
rt::Executor,
|
|
||||||
service::service_fn,
|
|
||||||
tcp::{client::service::Forwarder, server::TcpListener},
|
|
||||||
telemetry::tracing::{
|
|
||||||
self,
|
|
||||||
metadata::LevelFilter,
|
|
||||||
subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter},
|
|
||||||
},
|
|
||||||
Layer, Service,
|
|
||||||
};
|
};
|
||||||
use std::{convert::Infallible, time::Duration};
|
mod server;
|
||||||
|
use server::proxy::run_server;
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
struct Args {
|
|
||||||
#[arg(short, long, default_value_t = 0)]
|
|
||||||
port: u16,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
|
@ -52,85 +26,8 @@ fn setup_tracing() {
|
||||||
tracing::info!("Tracing is set up");
|
tracing::info!("Tracing is set up");
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_server(port: u16) {
|
#[derive(Parser, Debug)]
|
||||||
let graceful = rama::graceful::Shutdown::default();
|
struct Args {
|
||||||
|
#[arg(short, long, default_value_t = 0)]
|
||||||
graceful.spawn_task_fn(move |guard| server_task(guard, port));
|
port: u16,
|
||||||
|
|
||||||
graceful
|
|
||||||
.shutdown_with_limit(Duration::from_secs(30))
|
|
||||||
.await
|
|
||||||
.expect("graceful shutdown");
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn server_task(guard: rama::graceful::ShutdownGuard, port: u16) {
|
|
||||||
let tcp_address = format!("127.0.0.1:{}", port);
|
|
||||||
|
|
||||||
let tcp_service = TcpListener::build()
|
|
||||||
.bind(tcp_address)
|
|
||||||
.await
|
|
||||||
.expect("bind tcp proxy");
|
|
||||||
|
|
||||||
let local_address = tcp_service.local_addr().expect("tcp proxy assigned a port");
|
|
||||||
tracing::info!("Safe-chain proxy running on {local_address}");
|
|
||||||
|
|
||||||
let exec = Executor::graceful(guard.clone());
|
|
||||||
|
|
||||||
let http_service = HttpServer::auto(exec).service(
|
|
||||||
(
|
|
||||||
TraceLayer::new_for_http(),
|
|
||||||
ConsumeErrLayer::default(),
|
|
||||||
UpgradeLayer::new(
|
|
||||||
MethodMatcher::CONNECT,
|
|
||||||
service_fn(http_connect_accept),
|
|
||||||
ConsumeErrLayer::default().into_layer(Forwarder::ctx()),
|
|
||||||
),
|
|
||||||
RemoveResponseHeaderLayer::hop_by_hop(),
|
|
||||||
RemoveRequestHeaderLayer::hop_by_hop(),
|
|
||||||
)
|
|
||||||
.into_layer(service_fn(http_plain_proxy)),
|
|
||||||
);
|
|
||||||
|
|
||||||
tcp_service
|
|
||||||
.serve_graceful(
|
|
||||||
guard,
|
|
||||||
(
|
|
||||||
// protect the http proxy from too large bodies, both from request and response end
|
|
||||||
BodyLimitLayer::symmetric(500 * 1024 * 1024),
|
|
||||||
)
|
|
||||||
.into_layer(http_service),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn http_connect_accept(mut req: Request) -> Result<(Response, Request), Response> {
|
|
||||||
match RequestContext::try_from(&req).map(|ctx| ctx.host_with_port()) {
|
|
||||||
Ok(authority) => {
|
|
||||||
tracing::info!(
|
|
||||||
server.address = %authority.host,
|
|
||||||
server.port = authority.port,
|
|
||||||
"accept CONNECT",
|
|
||||||
);
|
|
||||||
req.extensions_mut().insert(ProxyTarget(authority));
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("error extracting authority: {err:?}");
|
|
||||||
return Err(StatusCode::BAD_REQUEST.into_response());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok((StatusCode::OK.into_response(), req));
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn http_plain_proxy(req: Request) -> Result<Response, Infallible> {
|
|
||||||
let client = EasyHttpWebClient::default();
|
|
||||||
|
|
||||||
return match client.serve(req).await {
|
|
||||||
Ok(resp) => Ok(resp),
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("Error forwarding request: {err:?}");
|
|
||||||
let resp = StatusCode::BAD_GATEWAY.into_response();
|
|
||||||
Ok(resp)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
1
proxy/src/server/mod.rs
Normal file
1
proxy/src/server/mod.rs
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
pub mod proxy;
|
||||||
106
proxy/src/server/proxy.rs
Normal file
106
proxy/src/server/proxy.rs
Normal file
|
|
@ -0,0 +1,106 @@
|
||||||
|
use rama::{
|
||||||
|
extensions::ExtensionsMut,
|
||||||
|
http::{
|
||||||
|
client::EasyHttpWebClient,
|
||||||
|
layer::{
|
||||||
|
remove_header::{RemoveRequestHeaderLayer, RemoveResponseHeaderLayer},
|
||||||
|
trace::TraceLayer,
|
||||||
|
upgrade::UpgradeLayer,
|
||||||
|
},
|
||||||
|
matcher::MethodMatcher,
|
||||||
|
server::HttpServer,
|
||||||
|
service::web::response::IntoResponse,
|
||||||
|
Request, Response, StatusCode,
|
||||||
|
},
|
||||||
|
layer::ConsumeErrLayer,
|
||||||
|
net::{http::RequestContext, proxy::ProxyTarget, stream::layer::http::BodyLimitLayer},
|
||||||
|
rt::Executor,
|
||||||
|
service::service_fn,
|
||||||
|
tcp::{client::service::Forwarder, server::TcpListener},
|
||||||
|
telemetry::tracing::{self},
|
||||||
|
Layer, Service,
|
||||||
|
};
|
||||||
|
use std::{convert::Infallible, time::Duration};
|
||||||
|
|
||||||
|
pub async fn run_server(port: u16) {
|
||||||
|
let graceful = rama::graceful::Shutdown::default();
|
||||||
|
|
||||||
|
graceful.spawn_task_fn(move |guard| server_task(guard, port));
|
||||||
|
|
||||||
|
graceful
|
||||||
|
.shutdown_with_limit(Duration::from_secs(30))
|
||||||
|
.await
|
||||||
|
.expect("graceful shutdown");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn server_task(guard: rama::graceful::ShutdownGuard, port: u16) {
|
||||||
|
let tcp_address = format!("127.0.0.1:{}", port);
|
||||||
|
|
||||||
|
let tcp_service = TcpListener::build()
|
||||||
|
.bind(tcp_address)
|
||||||
|
.await
|
||||||
|
.expect("bind tcp proxy");
|
||||||
|
|
||||||
|
let local_address = tcp_service.local_addr().expect("tcp proxy assigned a port");
|
||||||
|
tracing::info!("Safe-chain proxy running on {local_address}");
|
||||||
|
|
||||||
|
let exec = Executor::graceful(guard.clone());
|
||||||
|
|
||||||
|
let http_service = HttpServer::auto(exec).service(
|
||||||
|
(
|
||||||
|
TraceLayer::new_for_http(),
|
||||||
|
ConsumeErrLayer::default(),
|
||||||
|
UpgradeLayer::new(
|
||||||
|
MethodMatcher::CONNECT,
|
||||||
|
service_fn(http_connect_accept),
|
||||||
|
ConsumeErrLayer::default().into_layer(Forwarder::ctx()),
|
||||||
|
),
|
||||||
|
RemoveResponseHeaderLayer::hop_by_hop(),
|
||||||
|
RemoveRequestHeaderLayer::hop_by_hop(),
|
||||||
|
)
|
||||||
|
.into_layer(service_fn(http_plain_proxy)),
|
||||||
|
);
|
||||||
|
|
||||||
|
tcp_service
|
||||||
|
.serve_graceful(
|
||||||
|
guard,
|
||||||
|
(
|
||||||
|
// protect the http proxy from too large bodies, both from request and response end
|
||||||
|
BodyLimitLayer::symmetric(500 * 1024 * 1024),
|
||||||
|
)
|
||||||
|
.into_layer(http_service),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn http_connect_accept(mut req: Request) -> Result<(Response, Request), Response> {
|
||||||
|
match RequestContext::try_from(&req).map(|ctx| ctx.host_with_port()) {
|
||||||
|
Ok(authority) => {
|
||||||
|
tracing::info!(
|
||||||
|
server.address = %authority.host,
|
||||||
|
server.port = authority.port,
|
||||||
|
"accept CONNECT",
|
||||||
|
);
|
||||||
|
req.extensions_mut().insert(ProxyTarget(authority));
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!("error extracting authority: {err:?}");
|
||||||
|
return Err(StatusCode::BAD_REQUEST.into_response());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok((StatusCode::OK.into_response(), req));
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn http_plain_proxy(req: Request) -> Result<Response, Infallible> {
|
||||||
|
let client = EasyHttpWebClient::default();
|
||||||
|
|
||||||
|
return match client.serve(req).await {
|
||||||
|
Ok(resp) => Ok(resp),
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!("Error forwarding request: {err:?}");
|
||||||
|
let resp = StatusCode::BAD_GATEWAY.into_response();
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue