Checkpoint, or something.

This commit is contained in:
2022-11-22 20:13:14 -08:00
parent 277125e1a0
commit 1d182a150f
9 changed files with 467 additions and 97 deletions

View File

@@ -1,3 +0,0 @@
fn main() -> Result<(), ()> {
Ok(())
}

View File

@@ -1,5 +1,3 @@
use std::net::SocketAddr;
use crate::address::SOCKSv5Address;
use crate::messages::{
AuthenticationMethod, ClientConnectionCommand, ClientConnectionCommandReadError,
@@ -8,14 +6,24 @@ use crate::messages::{
ServerAuthResponse, ServerAuthResponseWriteError, ServerChoice, ServerChoiceWriteError,
ServerResponse, ServerResponseStatus, ServerResponseWriteError,
};
use crate::security_parameters::SecurityParameters;
pub use crate::security_parameters::SecurityParameters;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use thiserror::Error;
use tokio::io::{copy_bidirectional, AsyncWriteExt};
use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
use tokio::task::JoinHandle;
use tracing::{field, info_span, Instrument, Span};
#[derive(Clone)]
pub struct SOCKSv5Server {
info: Arc<ServerInfo>,
}
struct ServerInfo {
security_parameters: SecurityParameters,
next_id: AtomicU64,
}
#[derive(Clone, Debug, Error, PartialEq)]
@@ -54,7 +62,10 @@ impl SOCKSv5Server {
/// be synced across all the instances.
pub fn new(security_parameters: SecurityParameters) -> Self {
SOCKSv5Server {
security_parameters,
info: Arc::new(ServerInfo {
security_parameters,
next_id: AtomicU64::new(1),
}),
}
}
@@ -70,7 +81,7 @@ impl SOCKSv5Server {
&self,
addr: A,
port: u16,
) -> Result<(), std::io::Error> {
) -> Result<JoinHandle<Result<(), std::io::Error>>, std::io::Error> {
let listener = match addr.into() {
SOCKSv5Address::IP4(x) => TcpListener::bind((x, port)).await?,
SOCKSv5Address::IP6(x) => TcpListener::bind((x, port)).await?,
@@ -86,45 +97,52 @@ impl SOCKSv5Server {
let second_life = self.clone();
tokio::task::spawn(async move {
if let Err(e) = second_life.server_loop(listener).await {
tracing::error!(
"{}:{}: server network error: {}",
sockaddr.ip(),
sockaddr.port(),
e
);
}
});
Ok(())
Ok(tokio::task::spawn(async move {
second_life.server_loop(listener).await
}))
}
/// Run the server loop for a particular listener. This routine will never actually
/// return except in error conditions.
async fn server_loop(self, listener: TcpListener) -> Result<(), std::io::Error> {
let local_addr = listener.local_addr()?;
loop {
let (socket, their_addr) = listener.accept().await?;
let accepted_span = info_span!(
"session",
server_address=?local_addr,
remote_address=?their_addr,
auth_method=field::Empty,
ident=field::Empty,
);
// before we do anything of note, make sure this connection is cool. we don't want
// to waste any resources (and certainly don't want to handle any data!) if this
// isn't someone we want to accept connections from.
tracing::trace!("Initial accept of connection from {}", their_addr);
if let Some(checker) = self.security_parameters.allow_connection {
if !checker(&their_addr) {
tracing::info!("Rejecting attempted connection from {}", their_addr,);
}
continue;
}
// continue this work in another task. we could absolutely do this work here,
// but just in case someone starts doing slow responses (or other nasty things),
// we want to make sure that that doesn't slow down our ability to accept other
// requests.
let me_again = self.clone();
tokio::task::spawn(async move {
if let Err(e) = me_again.start_authentication(their_addr, socket).await {
tracing::error!("{}: server handler failure: {}", their_addr, e);
accepted_span.in_scope(|| {
// before we do anything of note, make sure this connection is cool. we don't want
// to waste any resources (and certainly don't want to handle any data!) if this
// isn't someone we want to accept connections from.
if let Some(checker) = self.info.security_parameters.allow_connection {
if !checker(&their_addr) {
tracing::info!("Rejecting attempted connection from {}", their_addr,);
}
} else {
// continue this work in another task. we could absolutely do this work here,
// but just in case someone starts doing slow responses (or other nasty things),
// we want to make sure that that doesn't slow down our ability to accept other
// requests.
let span_again = accepted_span.clone();
let me_again = self.clone();
tokio::task::spawn(async move {
let session_identifier =
me_again.info.next_id.fetch_add(1, Ordering::SeqCst);
span_again.record("ident", &session_identifier);
if let Err(e) = me_again
.start_authentication(their_addr, socket)
.instrument(span_again)
.await
{
tracing::error!("{}: server handler failure: {}", their_addr, e);
}
});
}
});
}
@@ -139,8 +157,10 @@ impl SOCKSv5Server {
) -> Result<(), SOCKSv5ServerError> {
let greeting = ClientGreeting::read(&mut socket).await?;
match choose_authentication_method(&self.security_parameters, &greeting.acceptable_methods)
{
match choose_authentication_method(
&self.info.security_parameters,
&greeting.acceptable_methods,
) {
// it's not us, it's you. (we're just going to say no.)
None => {
tracing::trace!(
@@ -157,6 +177,7 @@ impl SOCKSv5Server {
// the gold standard. great choice.
Some(ChosenMethod::TLS(_converter)) => {
Span::current().record("auth_method", &"TLS");
unimplemented!()
}
@@ -176,6 +197,7 @@ impl SOCKSv5Server {
let its_all_good = ServerAuthResponse::success();
its_all_good.write(&mut socket).await?;
socket.flush().await?;
Span::current().record("auth_method", &"password");
self.choose_mode(socket, their_addr).await
} else {
let yeah_no = ServerAuthResponse::failure();
@@ -189,13 +211,10 @@ impl SOCKSv5Server {
// Um. I guess we're doing this unchecked. Yay?
Some(ChosenMethod::None) => {
tracing::trace!(
"{}: Just skipping the whole authentication thing.",
their_addr,
);
let nothin_i_guess = ServerChoice::option(AuthenticationMethod::None);
nothin_i_guess.write(&mut socket).await?;
socket.flush().await?;
Span::current().record("auth_method", &"unauthenticated");
self.choose_mode(socket, their_addr).await
}
}
@@ -214,7 +233,7 @@ impl SOCKSv5Server {
self.handle_udp_request(socket, their_addr, ccr).await?
}
ClientConnectionCommand::EstablishTCPStream => {
self.handle_tcp_request(socket, their_addr, ccr).await?
self.handle_tcp_request(socket, ccr).await?
}
ClientConnectionCommand::EstablishTCPPortBinding => {
self.handle_tcp_binding_request(socket, their_addr, ccr)
@@ -272,15 +291,11 @@ impl SOCKSv5Server {
async fn handle_tcp_request(
self,
mut stream: TcpStream,
their_addr: SocketAddr,
ccr: ClientConnectionRequest,
) -> Result<(), SOCKSv5ServerError> {
// Let the user know that we're maybe making progress
let my_addr = stream.local_addr()?;
tracing::info!(
"[{}] Handling TCP forward request from {}, seeking to connect to {}:{}",
my_addr,
their_addr,
"Handling TCP forward request to {}:{}",
ccr.destination_address,
ccr.destination_port
);
@@ -295,10 +310,10 @@ impl SOCKSv5Server {
}
};
tracing::trace!(
"Connection established to {}:{}",
ccr.destination_address,
ccr.destination_port
let tcp_forwarding_span = info_span!(
"tcp_forwarding",
target_address=?ccr.destination_address,
target_port=ccr.destination_port
);
// Now, for whatever reason -- and this whole thing sent me down a garden path
@@ -310,10 +325,15 @@ impl SOCKSv5Server {
bound_address: bound_address.ip().into(),
bound_port: bound_address.port(),
};
response.write(&mut stream).await?;
response
.write(&mut stream)
.instrument(tcp_forwarding_span.clone())
.await?;
// so now tie our streams together, and we're good to go
tie_streams(stream, outgoing_stream).await;
tie_streams(stream, outgoing_stream)
.instrument(tcp_forwarding_span)
.await;
Ok(())
}
@@ -369,47 +389,22 @@ impl SOCKSv5Server {
}
async fn tie_streams(mut left: TcpStream, mut right: TcpStream) {
let left_local_addr = left
.local_addr()
.expect("couldn't get left local address in tie_streams");
let left_peer_addr = left
.peer_addr()
.expect("couldn't get left peer address in tie_streams");
let right_local_addr = right
.local_addr()
.expect("couldn't get right local address in tie_streams");
let right_peer_addr = right
.peer_addr()
.expect("couldn't get right peer address in tie_streams");
tokio::task::spawn(async move {
tracing::info!(
"Setting up linkage {}/{} <-> {}/{}",
left_peer_addr,
left_local_addr,
right_local_addr,
right_peer_addr
);
match copy_bidirectional(&mut left, &mut right).await {
Ok((l2r, r2l)) => tracing::info!(
"Shutting down linkage {}/{} <-> {}/{} (sent {} and {} bytes, respectively)",
left_peer_addr,
left_local_addr,
right_local_addr,
right_peer_addr,
l2r,
r2l
),
Err(e) => tracing::warn!(
"Shutting down linkage {}/{} <-> {}/{} with error: {}",
left_peer_addr,
left_local_addr,
right_local_addr,
right_peer_addr,
e
),
let span_copy = Span::current();
tracing::info!("linking forwarding streams");
tokio::task::spawn(
async move {
match copy_bidirectional(&mut left, &mut right)
.instrument(span_copy)
.await
{
Ok((l2r, r2l)) => {
tracing::info!(sent = l2r, received = r2l, "shutting down streams")
}
Err(e) => tracing::warn!("Linked streams shut down with error: {}", e),
}
}
});
.instrument(Span::current()),
);
}
#[allow(clippy::upper_case_acronyms)]