Get TCP forwarding working with itself and an external client.

As it happens, this is a pretty major change, because I misunderstood
how the protocol actually works. Rather than having a single core
command channel and then a series of offshoots, SOCKSv5 does a separate
handshake for each individual command, and then uses the command stream
as a data stream. So ... whoops. So now the `SOCKSv5Server` sits on a
listener, instead, and farms each of the connections out to a task.
This commit is contained in:
2021-11-21 21:08:58 -08:00
parent 74f66ef747
commit 3737d0739d
4 changed files with 481 additions and 253 deletions

View File

@@ -1,7 +1,7 @@
use async_socks5::network::Builtin; use async_socks5::network::Builtin;
use async_socks5::server::{SOCKSv5Server, SecurityParameters}; use async_socks5::server::{SOCKSv5Server, SecurityParameters};
use async_std::io; use async_std::io;
use async_std::net::TcpListener; use futures::stream::StreamExt;
use simplelog::{ColorChoice, CombinedLogger, Config, LevelFilter, TermLogger, TerminalMode}; use simplelog::{ColorChoice, CombinedLogger, Config, LevelFilter, TermLogger, TerminalMode};
#[async_std::main] #[async_std::main]
@@ -14,17 +14,23 @@ async fn main() -> Result<(), io::Error> {
)]) )])
.expect("Couldn't initialize logger"); .expect("Couldn't initialize logger");
let main_listener = TcpListener::bind("127.0.0.1:0").await?;
let params = SecurityParameters { let params = SecurityParameters {
allow_unauthenticated: false, allow_unauthenticated: true,
allow_connection: None, allow_connection: None,
check_password: None, check_password: None,
connect_tls: None, connect_tls: None,
}; };
let server = SOCKSv5Server::new(Builtin::new(), params, main_listener); let mut server = SOCKSv5Server::new(Builtin::new(), params);
server.start("127.0.0.1", 9999).await?;
server.run().await?; let mut responses = Box::pin(server.subserver_results());
while let Some(response) = responses.next().await {
if let Err(e) = response {
println!("Server failed with: {}", e);
}
}
Ok(()) Ok(())
} }

View File

@@ -4,18 +4,19 @@ use crate::messages::{
ClientUsernamePassword, ServerAuthResponse, ServerChoice, ServerResponse, ServerResponseStatus, ClientUsernamePassword, ServerAuthResponse, ServerChoice, ServerResponse, ServerResponseStatus,
}; };
use crate::network::datagram::GenericDatagramSocket; use crate::network::datagram::GenericDatagramSocket;
use crate::network::generic::Networklike; use crate::network::generic::{IntoErrorResponse, Networklike};
use crate::network::listener::GenericListener; use crate::network::listener::GenericListener;
use crate::network::stream::GenericStream; use crate::network::stream::GenericStream;
use crate::network::SOCKSv5Address; use crate::network::SOCKSv5Address;
use async_std::io; use async_std::io;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait; use async_trait::async_trait;
use futures::io::{AsyncRead, AsyncWrite}; use log::{info, trace, warn};
use log::{trace, warn}; use std::fmt::{Debug, Display};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum SOCKSv5Error { pub enum SOCKSv5Error<E: Debug + Display> {
#[error("SOCKSv5 serialization error: {0}")] #[error("SOCKSv5 serialization error: {0}")]
SerializationError(#[from] SerializationError), SerializationError(#[from] SerializationError),
#[error("SOCKSv5 deserialization error: {0}")] #[error("SOCKSv5 deserialization error: {0}")]
@@ -30,24 +31,24 @@ pub enum SOCKSv5Error {
ServerFailure(#[from] ServerResponseStatus), ServerFailure(#[from] ServerResponseStatus),
#[error("Connection error: {0}")] #[error("Connection error: {0}")]
ConnectionError(#[from] io::Error), ConnectionError(#[from] io::Error),
#[error("Underlying network error: {0}")]
UnderlyingNetwork(E),
} }
impl From<SOCKSv5Error> for ServerResponseStatus { impl<E: Debug + Display> IntoErrorResponse for SOCKSv5Error<E> {
fn from(x: SOCKSv5Error) -> Self { fn into_response(&self) -> ServerResponseStatus {
match x { match self {
SOCKSv5Error::ServerFailure(v) => v, SOCKSv5Error::ServerFailure(v) => v.clone(),
_ => ServerResponseStatus::GeneralFailure, _ => ServerResponseStatus::GeneralFailure,
} }
} }
} }
pub struct SOCKSv5Client<S, N> pub struct SOCKSv5Client<N: Networklike + Sync> {
where network: Arc<Mutex<N>>,
S: AsyncRead + AsyncWrite + Sync, login_info: LoginInfo,
N: Networklike + Sync, address: SOCKSv5Address,
{ port: u16,
network: N,
stream: S,
} }
pub struct LoginInfo { pub struct LoginInfo {
@@ -74,31 +75,75 @@ pub struct UsernamePassword {
pub password: String, pub password: String,
} }
impl<S, N> SOCKSv5Client<S, N> impl<N> SOCKSv5Client<N>
where where
S: AsyncRead + AsyncWrite + Send + Unpin + Sync,
N: Networklike + Sync, N: Networklike + Sync,
{ {
/// Create a new SOCKSv5 client connection over the given steam, using the given /// Create a new SOCKSv5 client connection over the given steam, using the given
/// authentication information. /// authentication information. As part of the process of building this object, we
pub async fn new(network: N, mut stream: S, login: &LoginInfo) -> Result<Self, SOCKSv5Error> { /// do a little test run to make sure that we can login effectively; this should save
let acceptable_methods = login.acceptable_methods(); /// from *some* surprises later on. If you'd rather *not* do that, though, you can
/// try `unchecked_new`.
pub async fn new<A: Into<SOCKSv5Address>>(
network: N,
login: LoginInfo,
server_addr: A,
server_port: u16,
) -> Result<Self, SOCKSv5Error<N::Error>> {
let base_version = SOCKSv5Client::unchecked_new(network, login, server_addr, server_port);
let _ = base_version.start_session().await?;
Ok(base_version)
}
/// Create a new SOCKSv5Client within the given parameters, but don't do a quick
/// check to see if this connection has a chance of working. This saves you a TCP
/// connection sequence at the expense of an increased possibility of an error
/// later on down the road.
pub fn unchecked_new<A: Into<SOCKSv5Address>>(
network: N,
login_info: LoginInfo,
address: A,
port: u16,
) -> Self {
SOCKSv5Client {
network: Arc::new(Mutex::new(network)),
login_info,
address: address.into(),
port,
}
}
/// This runs the connection and negotiates login, as required, and then returns
/// the stream the caller should use to do ... whatever it wants to do.
async fn start_session(&self) -> Result<GenericStream, SOCKSv5Error<N::Error>> {
// create the initial stream
let mut stream = {
let mut network = self.network.lock().await;
network.connect(self.address.clone(), self.port).await
}
.map_err(SOCKSv5Error::UnderlyingNetwork)?;
// compute how we can log in
let acceptable_methods = self.login_info.acceptable_methods();
trace!( trace!(
"Computed acceptable methods -- {:?} -- sending client greeting.", "Computed acceptable methods -- {:?} -- sending client greeting.",
acceptable_methods acceptable_methods
); );
// Negotiate with the server. Well. "Negotiate."
let client_greeting = ClientGreeting { acceptable_methods }; let client_greeting = ClientGreeting { acceptable_methods };
client_greeting.write(&mut stream).await?; client_greeting.write(&mut stream).await?;
trace!("Write client greeting, waiting for server's choice."); trace!("Write client greeting, waiting for server's choice.");
let server_choice = ServerChoice::read(&mut stream).await?; let server_choice = ServerChoice::read(&mut stream).await?;
trace!("Received server's choice: {}", server_choice.chosen_method); trace!("Received server's choice: {}", server_choice.chosen_method);
// Let's do it!
match server_choice.chosen_method { match server_choice.chosen_method {
AuthenticationMethod::None => {} AuthenticationMethod::None => {}
AuthenticationMethod::UsernameAndPassword => { AuthenticationMethod::UsernameAndPassword => {
let (username, password) = if let Some(ref linfo) = login.username_password { let (username, password) = if let Some(ref linfo) =
self.login_info.username_password
{
trace!("Server requested username/password, getting data from login info."); trace!("Server requested username/password, getting data from login info.");
(linfo.username.clone(), linfo.password.clone()) (linfo.username.clone(), linfo.password.clone())
} else { } else {
@@ -125,49 +170,58 @@ where
x => return Err(SOCKSv5Error::UnsupportedAuthMethodChosen(x)), x => return Err(SOCKSv5Error::UnsupportedAuthMethodChosen(x)),
} }
trace!("Returning new SOCKSv5Client object!"); Ok(stream)
Ok(SOCKSv5Client { }
network,
stream, /// Listen for one connection on the proxy server, and then wire back a socket
}) /// that can talk to whoever connects. This handshake is a little odd, because
/// we don't necessarily know what port or address we should tell the other
/// person to listen on. So this function takes an async function, which it
/// will pass this information to once it has it. It's up to that function,
/// then, to communicate this to its peer.
pub async fn remote_listen<A, E: Debug + Display>(
self,
_addr: A,
_port: u16,
) -> Result<GenericStream, SOCKSv5Error<E>>
where
A: Into<SOCKSv5Address>,
{
unimplemented!()
} }
} }
#[async_trait] #[async_trait]
impl<S, N> Networklike for SOCKSv5Client<S, N> impl<N> Networklike for SOCKSv5Client<N>
where where
S: AsyncRead + AsyncWrite + Send + Unpin + Sync,
N: Networklike + Sync + Send, N: Networklike + Sync + Send,
{ {
type Error = SOCKSv5Error; type Error = SOCKSv5Error<N::Error>;
async fn connect<A: Send + Into<SOCKSv5Address>>( async fn connect<A: Send + Into<SOCKSv5Address>>(
&mut self, &mut self,
addr: A, addr: A,
port: u16, port: u16,
) -> Result<GenericStream, Self::Error> { ) -> Result<GenericStream, Self::Error> {
let request = ClientConnectionRequest { let mut stream = self.start_session().await?;
let target = addr.into();
let ccr = ClientConnectionRequest {
command_code: ClientConnectionCommand::EstablishTCPStream, command_code: ClientConnectionCommand::EstablishTCPStream,
destination_address: addr.into(), destination_address: target.clone(),
destination_port: port, destination_port: port,
}; };
ccr.write(&mut stream).await?;
request.write(&mut self.stream).await?; let response = ServerResponse::read(&mut stream).await?;
let response = ServerResponse::read(&mut self.stream).await?;
if response.status == ServerResponseStatus::RequestGranted { if response.status == ServerResponseStatus::RequestGranted {
self.network info!(
.connect(response.bound_address, response.bound_port) "Proxy connection to {}:{} established; server is using {}:{}",
.await target, port, response.bound_address, response.bound_port
.map_err(|e| { );
SOCKSv5Error::ConnectionError(io::Error::new( Ok(stream)
io::ErrorKind::Other,
format!("{}", e),
))
})
} else { } else {
Err(SOCKSv5Error::ServerFailure(response.status)) Err(response.status.into())
} }
} }

View File

@@ -19,21 +19,17 @@ mod test {
#[test] #[test]
fn unrestricted_login() { fn unrestricted_login() {
task::block_on(async { task::block_on(async {
let mut network_stack = TestingStack::default(); let network_stack = TestingStack::default();
// generate the server // generate the server
let security_parameters = SecurityParameters::unrestricted(); let security_parameters = SecurityParameters::unrestricted();
let default_port = network_stack.listen("localhost", 9999).await.unwrap(); let server = SOCKSv5Server::new(network_stack.clone(), security_parameters);
let server = server.start("localhost", 9999).await.unwrap();
SOCKSv5Server::new(network_stack.clone(), security_parameters, default_port);
let _server_task = task::spawn(async move { server.run().await });
let stream = network_stack.connect("localhost", 9999).await.unwrap();
let login_info = LoginInfo { let login_info = LoginInfo {
username_password: None, username_password: None,
}; };
let client = SOCKSv5Client::new(network_stack, stream, &login_info).await; let client = SOCKSv5Client::new(network_stack, login_info, "localhost", 9999).await;
assert!(client.is_ok()); assert!(client.is_ok());
}) })
@@ -42,22 +38,18 @@ mod test {
#[test] #[test]
fn disallow_unrestricted() { fn disallow_unrestricted() {
task::block_on(async { task::block_on(async {
let mut network_stack = TestingStack::default(); let network_stack = TestingStack::default();
// generate the server // generate the server
let mut security_parameters = SecurityParameters::unrestricted(); let mut security_parameters = SecurityParameters::unrestricted();
security_parameters.allow_unauthenticated = false; security_parameters.allow_unauthenticated = false;
let default_port = network_stack.listen("localhost", 9999).await.unwrap(); let server = SOCKSv5Server::new(network_stack.clone(), security_parameters);
let server = server.start("localhost", 9998).await.unwrap();
SOCKSv5Server::new(network_stack.clone(), security_parameters, default_port);
let _server_task = task::spawn(async move { server.run().await });
let stream = network_stack.connect("localhost", 9999).await.unwrap();
let login_info = LoginInfo { let login_info = LoginInfo {
username_password: None, username_password: None,
}; };
let client = SOCKSv5Client::new(network_stack, stream, &login_info).await; let client = SOCKSv5Client::new(network_stack, login_info, "localhost", 9998).await;
assert!(client.is_err()); assert!(client.is_err());
}) })
@@ -66,7 +58,7 @@ mod test {
#[test] #[test]
fn password_checks() { fn password_checks() {
task::block_on(async { task::block_on(async {
let mut network_stack = TestingStack::default(); let network_stack = TestingStack::default();
// generate the server // generate the server
let security_parameters = SecurityParameters { let security_parameters = SecurityParameters {
@@ -77,32 +69,28 @@ mod test {
username == "awick" && password == "password" username == "awick" && password == "password"
}), }),
}; };
let default_port = network_stack.listen("localhost", 9999).await.unwrap(); let server = SOCKSv5Server::new(network_stack.clone(), security_parameters);
let server = server.start("localhost", 9997).await.unwrap();
SOCKSv5Server::new(network_stack.clone(), security_parameters, default_port);
let _server_task = task::spawn(async move { server.run().await });
// try the positive side // try the positive side
let stream = network_stack.connect("localhost", 9999).await.unwrap();
let login_info = LoginInfo { let login_info = LoginInfo {
username_password: Some(UsernamePassword { username_password: Some(UsernamePassword {
username: "awick".to_string(), username: "awick".to_string(),
password: "password".to_string(), password: "password".to_string(),
}), }),
}; };
let client = SOCKSv5Client::new(network_stack.clone(), stream, &login_info).await; let client =
SOCKSv5Client::new(network_stack.clone(), login_info, "localhost", 9997).await;
assert!(client.is_ok()); assert!(client.is_ok());
// try the negative side // try the negative side
let stream = network_stack.connect("localhost", 9999).await.unwrap();
let login_info = LoginInfo { let login_info = LoginInfo {
username_password: Some(UsernamePassword { username_password: Some(UsernamePassword {
username: "adamw".to_string(), username: "adamw".to_string(),
password: "password".to_string(), password: "password".to_string(),
}), }),
}; };
let client = SOCKSv5Client::new(network_stack, stream, &login_info).await; let client = SOCKSv5Client::new(network_stack, login_info, "localhost", 9997).await;
assert!(client.is_err()); assert!(client.is_err());
}) })
} }
@@ -110,22 +98,18 @@ mod test {
#[test] #[test]
fn firewall_blocks() { fn firewall_blocks() {
task::block_on(async { task::block_on(async {
let mut network_stack = TestingStack::default(); let network_stack = TestingStack::default();
// generate the server // generate the server
let mut security_parameters = SecurityParameters::unrestricted(); let mut security_parameters = SecurityParameters::unrestricted();
security_parameters.allow_connection = Some(|_, _| false); security_parameters.allow_connection = Some(|_, _| false);
let default_port = network_stack.listen("localhost", 9999).await.unwrap(); let server = SOCKSv5Server::new(network_stack.clone(), security_parameters);
let server = server.start("localhost", 9996).await.unwrap();
SOCKSv5Server::new(network_stack.clone(), security_parameters, default_port);
let _server_task = task::spawn(async move { server.run().await });
let stream = network_stack.connect("localhost", 9999).await.unwrap();
let login_info = LoginInfo { let login_info = LoginInfo {
username_password: None, username_password: None,
}; };
let client = SOCKSv5Client::new(network_stack, stream, &login_info).await; let client = SOCKSv5Client::new(network_stack, login_info, "localhost", 9996).await;
assert!(client.is_err()); assert!(client.is_err());
}) })
@@ -140,18 +124,14 @@ mod test {
// generate the server // generate the server
let security_parameters = SecurityParameters::unrestricted(); let security_parameters = SecurityParameters::unrestricted();
let default_port = network_stack.listen("localhost", 9999).await.unwrap(); let server = SOCKSv5Server::new(network_stack.clone(), security_parameters);
let server = server.start("localhost", 9995).await.unwrap();
SOCKSv5Server::new(network_stack.clone(), security_parameters, default_port);
let _server_task = task::spawn(async move { server.run().await });
let stream = network_stack.connect("localhost", 9999).await.unwrap();
let login_info = LoginInfo { let login_info = LoginInfo {
username_password: None, username_password: None,
}; };
let mut client = SOCKSv5Client::new(network_stack, stream, &login_info) let mut client = SOCKSv5Client::new(network_stack, login_info, "localhost", 9995)
.await .await
.unwrap(); .unwrap();

View File

@@ -1,3 +1,23 @@
//! An implementation of a SOCKSv5 server, parameterizable by the security parameters
//! and network stack you want to use. You should implement the server by first
//! setting up the `SecurityParameters`, then initializing the server object, and
//! then running it, as follows:
//!
//! ```
//! use async_socks5::network::Builtin;
//! use async_socks5::server::{SecurityParameters, SOCKSv5Server};
//! use std::io;
//!
//! async {
//! let parameters = SecurityParameters::new()
//! .password_check(|u,p| { u == "adam" && p == "evil" });
//! let network = Builtin::new();
//! let server = SOCKSv5Server::new(network, parameters);
//! server.start("localhost", 9999).await;
//! // ... do other stuff ...
//! };
//!
//! ```
use crate::errors::{AuthenticationError, DeserializationError, SerializationError}; use crate::errors::{AuthenticationError, DeserializationError, SerializationError};
use crate::messages::{ use crate::messages::{
AuthenticationMethod, ClientConnectionCommand, ClientConnectionRequest, ClientGreeting, AuthenticationMethod, ClientConnectionCommand, ClientConnectionRequest, ClientGreeting,
@@ -12,24 +32,69 @@ use async_std::io;
use async_std::io::prelude::WriteExt; use async_std::io::prelude::WriteExt;
use async_std::sync::{Arc, Mutex}; use async_std::sync::{Arc, Mutex};
use async_std::task; use async_std::task;
use futures::Stream;
use log::{error, info, trace, warn}; use log::{error, info, trace, warn};
use std::collections::HashMap;
use std::default::Default;
use std::fmt::{Debug, Display};
use thiserror::Error; use thiserror::Error;
/// A convenient bit of shorthand for an address and port
pub type AddressAndPort = (SOCKSv5Address, u16);
// Just some shorthand for us.
type ResultHandle = task::JoinHandle<Result<(), String>>;
/// A handle representing a SOCKSv5 server, parameterized by the underlying network
/// stack it runs over.
#[derive(Clone)]
pub struct SOCKSv5Server<N: Networklike> { pub struct SOCKSv5Server<N: Networklike> {
network: N, network: Arc<Mutex<N>>,
running_servers: Arc<Mutex<HashMap<AddressAndPort, ResultHandle>>>,
security_parameters: SecurityParameters, security_parameters: SecurityParameters,
listener: GenericListener<N::Error>,
} }
/// The security parameters that you can assign to the server, to make decisions
/// about the weirdos it accepts as users. It is recommended that you only use
/// wide open connections when you're 100% sure that the server will only be
/// accessible locally.
#[derive(Clone)] #[derive(Clone)]
pub struct SecurityParameters { pub struct SecurityParameters {
/// Allow completely unauthenticated connections. You should be very, very
/// careful about setting this to true, especially if you don't provide a
/// guard to ensure that you're getting connections from reasonable places.
pub allow_unauthenticated: bool, pub allow_unauthenticated: bool,
/// An optional function that can serve as a firewall for new connections.
/// Return true if the connection should be allowed to continue, false if
/// it shouldn't. This check happens before any data is read from or written
/// to the connecting party.
pub allow_connection: Option<fn(&SOCKSv5Address, u16) -> bool>, pub allow_connection: Option<fn(&SOCKSv5Address, u16) -> bool>,
/// An optional function to check a user name (first argument) and password
/// (second argument). Return true if the username / password is good, false
/// if not.
pub check_password: Option<fn(&str, &str) -> bool>, pub check_password: Option<fn(&str, &str) -> bool>,
/// An optional function to transition the stream from an unencrypted one to
/// an encrypted on. The assumption is you're using something like `rustls`
/// to make this happen; the exact mechanism is outside the scope of this
/// particular crate. If the connection shouldn't be allowed for some reason
/// (a bad certificate or handshake, for example), then return None; otherwise,
/// return the new stream.
pub connect_tls: Option<fn(GenericStream) -> Option<GenericStream>>, pub connect_tls: Option<fn(GenericStream) -> Option<GenericStream>>,
} }
impl SecurityParameters { impl SecurityParameters {
/// Generates a `SecurityParameters` object that's empty. It won't accept
/// anything, because it has no mechanisms it can use to actually authenticate
/// a user and yet won't allow unauthenticated connections.
pub fn new() -> SecurityParameters {
SecurityParameters {
allow_unauthenticated: false,
allow_connection: None,
check_password: None,
connect_tls: None,
}
}
/// Generates a `SecurityParameters` object that does not, in any way, /// Generates a `SecurityParameters` object that does not, in any way,
/// restrict who can log in. It also will not induce any transition into /// restrict who can log in. It also will not induce any transition into
/// TLS. Use this at your own risk ... or, really, just don't use this, /// TLS. Use this at your own risk ... or, really, just don't use this,
@@ -42,36 +107,127 @@ impl SecurityParameters {
connect_tls: None, connect_tls: None,
} }
} }
/// Use the provided function to check incoming connections before proceeding
/// with the rest of the handshake.
pub fn check_connections(
mut self,
checker: fn(&SOCKSv5Address, u16) -> bool,
) -> SecurityParameters {
self.allow_connection = Some(checker);
self
}
/// Use the provided function to check usernames and passwords provided
/// to the server.
pub fn password_check(mut self, checker: fn(&str, &str) -> bool) -> SecurityParameters {
self.check_password = Some(checker);
self
}
/// Use the provide function to validate a TLS connection, and transition it
/// to the new stream type. If the handshake fails, return `None` instead of
/// `Some`. (And maybe log it somewhere, you know.)
pub fn tls_converter(
mut self,
converter: fn(GenericStream) -> Option<GenericStream>,
) -> SecurityParameters {
self.connect_tls = Some(converter);
self
}
} }
impl<N: Networklike + Send + 'static> SOCKSv5Server<N> { impl Default for SecurityParameters {
pub fn new<S: Listenerlike<Error = N::Error> + 'static>( fn default() -> Self {
network: N, Self::new()
security_parameters: SecurityParameters, }
stream: S, }
) -> SOCKSv5Server<N> {
impl<N: Networklike + Clone + Send + 'static> SOCKSv5Server<N> {
/// Initialize a SOCKSv5 server for use later on. Once initialize, you can listen on
/// as many addresses and ports as you like; the metadata about the server will be
/// sync'd across all of the instances, should you want to gather that data for some
/// reason.
pub fn new(network: N, security_parameters: SecurityParameters) -> SOCKSv5Server<N> {
SOCKSv5Server { SOCKSv5Server {
network, network: Arc::new(Mutex::new(network)),
running_servers: Arc::new(Mutex::new(HashMap::new())),
security_parameters, security_parameters,
listener: GenericListener {
internal: Box::new(stream),
},
} }
} }
pub async fn run(self) -> Result<(), N::Error> { /// Start a server on the given address and port. This function returns when it has
let (my_addr, my_port) = self.listener.local_addr(); /// set up its listening socket, but spawns a separate task to actually wait for
/// connections. You can query which ones are still active, or see which ones have
/// failed, using some of the other items in this structure.
pub async fn start<A: Send + Into<SOCKSv5Address>>(
&self,
addr: A,
port: u16,
) -> Result<(), N::Error> {
// This might seem a little weird, but we do this in a separate block to make it
// as clear as possible to the borrow checker (and the reader) that we only want
// to hold the lock while we're actually calling listen.
let listener = {
let mut network = self.network.lock().await;
network.listen(addr, port).await
}?;
// this should really be the same as the input, but technically they could've
// thrown some zeros in there and let the underlying network stack decide. So
// we'll just pull this information post-initialization, and maybe get something
// a bit more detailed.
let (my_addr, my_port) = listener.local_addr();
info!("Starting SOCKSv5 server on {}:{}", my_addr, my_port); info!("Starting SOCKSv5 server on {}:{}", my_addr, my_port);
let locked_network = Arc::new(Mutex::new(self.network));
// OK, spawn off the server loop, and then we'll register this in our list of
// things running.
let new_self = self.clone();
let task_id = task::spawn(async move {
new_self
.server_loop(listener)
.await
.map_err(|x| format!("Server network error: {}", x))
});
let mut server_map = self.running_servers.lock().await;
server_map.insert((my_addr, my_port), task_id);
Ok(())
}
/// Provide a list of open sockets on the server.
pub async fn open_sockets(&self) -> Vec<AddressAndPort> {
let server_map = self.running_servers.lock().await;
server_map.keys().cloned().collect()
}
pub fn subserver_results(&mut self) -> impl Stream<Item = Result<(), String>> {
futures::stream::unfold(self.running_servers.clone(), |locked_map| async move {
let first_server = {
let mut server_map = locked_map.lock().await;
let first_key = server_map.keys().next().cloned()?;
server_map.remove(&first_key)
}?;
let result = first_server.await;
Some((result, locked_map))
})
}
async fn server_loop(self, listener: GenericListener<N::Error>) -> Result<(), N::Error> {
loop { loop {
let (stream, their_addr, their_port) = self.listener.accept().await?; let (stream, their_addr, their_port) = listener.accept().await?;
trace!( trace!(
"Initial accept of connection from {}:{}", "Initial accept of connection from {}:{}",
their_addr, their_addr,
their_port their_port
); );
// before we do anything, make sure this connection is cool. we don't want to
// waste resources (or parse any data) if this isn't someone we actually care
// about it.
if let Some(checker) = &self.security_parameters.allow_connection { if let Some(checker) = &self.security_parameters.allow_connection {
if !checker(&their_addr, their_port) { if !checker(&their_addr, their_port) {
info!( info!(
@@ -82,24 +238,182 @@ impl<N: Networklike + Send + 'static> SOCKSv5Server<N> {
} }
} }
let params = self.security_parameters.clone(); // throw this off into another task to take from here. We could to the rest
let network_mutex_copy = locked_network.clone(); // of this handshake here, but there's a chance that an adversarial connection
// could just stall us out, and keep us from doing the next connection. So ...
// we'll potentially spin off the task early.
let me_again = self.clone();
task::spawn(async move { task::spawn(async move {
match run_authentication(params, stream).await { me_again
Ok(authed_stream) => { .authenticate_step(their_addr, their_port, stream)
match run_main_loop(network_mutex_copy, authed_stream).await { .await;
Ok(_) => {}
Err(e) => warn!("Failure in main loop: {}", e),
}
}
Err(e) => warn!(
"Failure running authentication from {}:{}: {}",
their_addr, their_port, e
),
}
}); });
} }
} }
async fn authenticate_step(
self,
their_addr: SOCKSv5Address,
their_port: u16,
base_stream: GenericStream,
) {
// Turn this stream into one where we've authenticated the other side. Or, you
// know, don't, and just restart this loop.
let mut authenticated_stream =
match run_authentication(&self.security_parameters, base_stream).await {
Ok(authed_stream) => authed_stream,
Err(e) => {
warn!(
"Failure running authentication from {}:{}: {}",
their_addr, their_port, e
);
return;
}
};
// Figure out what the client actually wants from this connection, and
// then dispatch a task to deal with that.
let mccr = ClientConnectionRequest::read(&mut authenticated_stream).await;
match mccr {
Err(e) => warn!("Failure figuring out what the client wanted: {}", e),
Ok(ccr) => match ccr.command_code {
ClientConnectionCommand::AssociateUDPPort => self
.handle_udp_request(authenticated_stream, ccr, their_addr, their_port)
.await
.unwrap_or_else(|e| warn!("Internal server error in UDP association: {}", e)),
ClientConnectionCommand::EstablishTCPPortBinding => self
.handle_tcp_bind(authenticated_stream, ccr, their_addr, their_port)
.await
.unwrap_or_else(|e| warn!("Internal server error in TCP bind: {}", e)),
ClientConnectionCommand::EstablishTCPStream => self
.handle_tcp_forward(authenticated_stream, ccr, their_addr, their_port)
.await
.unwrap_or_else(|e| warn!("Internal server error in TCP forward: {}", e)),
},
}
}
async fn handle_udp_request(
self,
stream: GenericStream,
ccr: ClientConnectionRequest,
their_addr: SOCKSv5Address,
their_port: u16,
) -> Result<(), ServerError<N::Error>> {
// Let the user know that we're maybe making progress
let (my_addr, my_port) = stream.local_addr();
info!(
"[{}:{}] Handling UDP bind request from {}:{}, seeking to bind {}:{}",
my_addr, my_port, their_addr, their_port, ccr.destination_address, ccr.destination_port
);
unimplemented!()
}
async fn handle_tcp_forward(
self,
mut stream: GenericStream,
ccr: ClientConnectionRequest,
their_addr: SOCKSv5Address,
their_port: u16,
) -> Result<(), ServerError<N::Error>> {
// Let the user know that we're maybe making progress
let (my_addr, my_port) = stream.local_addr();
info!(
"[{}:{}] Handling TCP forward request from {}:{}, seeking to connect to {}:{}",
my_addr, my_port, their_addr, their_port, ccr.destination_address, ccr.destination_port
);
// OK, first thing's first: We need to actually connect to the server that the user
// wants us to connect to.
let connection_res = {
let mut network = self.network.lock().await;
network
.connect(ccr.destination_address.clone(), ccr.destination_port)
.await
};
let outgoing_stream = match connection_res {
Ok(x) => x,
Err(e) => {
error!("Failed to connect to {}: {}", ccr.destination_address, e);
let response = ServerResponse::error(&e);
response.write(&mut stream).await?;
return Err(ServerError::NetworkError(e));
}
};
trace!(
"Connection established to {}:{}",
ccr.destination_address,
ccr.destination_port
);
// Now, for whatever reason -- and this whole thing sent me down a garden path
// in understanding how this whole protocol works -- we tell the user what address
// and port we bound for that connection.
let (bound_address, bound_port) = outgoing_stream.local_addr();
let response = ServerResponse {
status: ServerResponseStatus::RequestGranted,
bound_address,
bound_port,
};
response.write(&mut stream).await?;
// Now that we've informed them of that, we set up one task to transfer information
// from the current stream (`stream`) to the connection (`outgoing_stream`), and
// another task that goes in the reverse direction.
//
// I've chosen to start two fresh tasks and let this one die; I'm not sure that
// this is the right approach. My only rationale is that this might let some
// memory we might have accumulated along the way drop more easily, but that
// might not actually matter.
let mut from_left = stream.clone();
let mut from_right = outgoing_stream.clone();
let mut to_left = stream;
let mut to_right = outgoing_stream;
let from = format!("{}:{}", their_addr, their_port);
let to = format!("{}:{}", ccr.destination_address, ccr.destination_port);
task::spawn(async move {
info!(
"Spawned {}:{} >--> {}:{} task",
their_addr, their_port, ccr.destination_address, ccr.destination_port
);
if let Err(e) = io::copy(&mut from_left, &mut to_right).await {
warn!(
"{}:{} >--> {}:{} connection failed with: {}",
their_addr, their_port, ccr.destination_address, ccr.destination_port, e
);
}
});
task::spawn(async move {
info!("Spawned {} <--< {} task", from, to);
if let Err(e) = io::copy(&mut from_right, &mut to_left).await {
warn!("{} <--< {} connection failed with: {}", from, to, e);
}
});
Ok(())
}
async fn handle_tcp_bind(
self,
stream: GenericStream,
ccr: ClientConnectionRequest,
their_addr: SOCKSv5Address,
their_port: u16,
) -> Result<(), ServerError<N::Error>> {
// Let the user know that we're maybe making progress
let (my_addr, my_port) = stream.local_addr();
info!(
"[{}:{}] Handling UDP bind request from {}:{}, seeking to bind {}:{}",
my_addr, my_port, their_addr, their_port, ccr.destination_address, ccr.destination_port
);
unimplemented!()
}
} }
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
@@ -244,12 +558,12 @@ fn reasonable_auth_method_choices() {
} }
async fn run_authentication( async fn run_authentication(
params: SecurityParameters, params: &SecurityParameters,
mut stream: GenericStream, mut stream: GenericStream,
) -> Result<GenericStream, AuthenticationError> { ) -> Result<GenericStream, AuthenticationError> {
let greeting = ClientGreeting::read(&mut stream).await?; let greeting = ClientGreeting::read(&mut stream).await?;
match choose_authentication_method(&params, &greeting.acceptable_methods) { match choose_authentication_method(params, &greeting.acceptable_methods) {
// it's not us, it's you // it's not us, it's you
None => { None => {
trace!("Failed to find acceptable authentication method."); trace!("Failed to find acceptable authentication method.");
@@ -306,137 +620,11 @@ async fn run_authentication(
} }
#[derive(Error, Debug)] #[derive(Error, Debug)]
enum ServerError { pub enum ServerError<E: Debug + Display> {
#[error("Error in deserialization: {0}")] #[error("Error in deserialization: {0}")]
DeserializationError(#[from] DeserializationError), DeserializationError(#[from] DeserializationError),
#[error("Error in serialization: {0}")] #[error("Error in serialization: {0}")]
SerializationError(#[from] SerializationError), SerializationError(#[from] SerializationError),
} #[error("Underlying network error: {0}")]
NetworkError(E),
async fn run_main_loop<N>(
network: Arc<Mutex<N>>,
mut stream: GenericStream,
) -> Result<(), ServerError>
where
N: Networklike,
N::Error: 'static,
{
loop {
let ccr = ClientConnectionRequest::read(&mut stream).await?;
match ccr.command_code {
ClientConnectionCommand::AssociateUDPPort => {}
ClientConnectionCommand::EstablishTCPPortBinding => {}
ClientConnectionCommand::EstablishTCPStream => {
let target = format!("{}:{}", ccr.destination_address, ccr.destination_port);
info!(
"Client requested connection to {}:{}",
ccr.destination_address, ccr.destination_port
);
let connection_res = {
let mut network = network.lock().await;
network
.connect(ccr.destination_address.clone(), ccr.destination_port)
.await
};
let outgoing_stream = match connection_res {
Ok(x) => x,
Err(e) => {
error!("Failed to connect to {}: {}", target, e);
let response = ServerResponse::error(e);
response.write(&mut stream).await?;
continue;
}
};
trace!(
"Connection established to {}:{}",
ccr.destination_address,
ccr.destination_port
);
let incoming_res = {
let mut network = network.lock().await;
network.listen("127.0.0.1", 0).await
};
let incoming_listener = match incoming_res {
Ok(x) => x,
Err(e) => {
error!("Failed to bind server port for new TCP stream: {}", e);
let response = ServerResponse::error(e);
response.write(&mut stream).await?;
continue;
}
};
let (bound_address, bound_port) = incoming_listener.local_addr();
trace!(
"Set up {}:{} to address request for {}:{}",
bound_address,
bound_port,
ccr.destination_address,
ccr.destination_port
);
let response = ServerResponse {
status: ServerResponseStatus::RequestGranted,
bound_address,
bound_port,
};
response.write(&mut stream).await?;
task::spawn(async move {
let (incoming_stream, from_addr, from_port) = match incoming_listener
.accept()
.await
{
Err(e) => {
error!("Miscellaneous error waiting for someone to connect for proxying: {}", e);
return;
}
Ok(s) => s,
};
trace!(
"Accepted connection from {}:{} to attach to {}:{}",
from_addr,
from_port,
ccr.destination_address,
ccr.destination_port
);
let mut from_left = incoming_stream.clone();
let mut from_right = outgoing_stream.clone();
let mut to_left = incoming_stream;
let mut to_right = outgoing_stream;
let from = format!("{}:{}", from_addr, from_port);
let to = format!("{}:{}", ccr.destination_address, ccr.destination_port);
task::spawn(async move {
info!(
"Spawned {}:{} >--> {}:{} task",
from_addr, from_port, ccr.destination_address, ccr.destination_port
);
if let Err(e) = io::copy(&mut from_left, &mut to_right).await {
warn!(
"{}:{} >--> {}:{} connection failed with: {}",
from_addr,
from_port,
ccr.destination_address,
ccr.destination_port,
e
);
}
});
task::spawn(async move {
info!("Spawned {} <--< {} task", from, to);
if let Err(e) = io::copy(&mut from_right, &mut to_left).await {
warn!("{} <--< {} connection failed with: {}", from, to, e);
}
});
});
}
}
}
} }