From 1d182a150f21d4de21cdf4b6320eec9dd154f194 Mon Sep 17 00:00:00 2001 From: Adam Wick Date: Tue, 22 Nov 2022 20:13:14 -0800 Subject: [PATCH] Checkpoint, or something. --- Cargo.toml | 13 +++ TODO | 2 + sample.toml | 15 +++ server/config.rs | 177 +++++++++++++++++++++++++++++++++ server/config/cmdline.rs | 35 +++++++ server/config/config_file.rs | 62 ++++++++++++ server/main.rs | 74 ++++++++++++++ src/bin/socks-server.rs | 3 - src/server.rs | 183 +++++++++++++++++------------------ 9 files changed, 467 insertions(+), 97 deletions(-) create mode 100644 TODO create mode 100644 sample.toml create mode 100644 server/config.rs create mode 100644 server/config/cmdline.rs create mode 100644 server/config/config_file.rs create mode 100644 server/main.rs delete mode 100644 src/bin/socks-server.rs diff --git a/Cargo.toml b/Cargo.toml index 309235b..5735780 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,12 +7,25 @@ edition = "2018" [lib] name = "async_socks5" +[[bin]] +name="socks5-server" +path="server/main.rs" + [dependencies] anyhow = "^1.0.57" +clap = { version = "^3.1.18", features = ["derive"] } +futures = "0.3.21" +if-addrs = "0.7.0" +lazy_static = "1.4.0" proptest = "^1.0.0" +serde = "^1.0.137" +serde_derive = "^1.0.137" thiserror = "^1.0.31" tokio = { version = "^1", features = ["full"] } +toml = "^0.5.9" tracing = "^0.1.34" +tracing-subscriber = { version = "^0.3.11", features = ["env-filter"] } +xdg = "2.4.1" [dev-dependencies] proptest = "1.0.0" diff --git a/TODO b/TODO new file mode 100644 index 0000000..4d415ce --- /dev/null +++ b/TODO @@ -0,0 +1,2 @@ + * [ ] Turn `write` from &self to self + * [ ] Turn `read`/`write` into a typeclass of some kind. diff --git a/sample.toml b/sample.toml new file mode 100644 index 0000000..fdde7e7 --- /dev/null +++ b/sample.toml @@ -0,0 +1,15 @@ +# Unless otherwise sepcified, use this log level +log_level = "TRACE" +# Unless the command line specifies servers (or includes --validate), +# start the following named servers. +start_servers = "loopback*,ethernet" + +[loopback4] +interface="lo0" +log_level="TRACE" +address="127.0.0.1" + +[loopback6] +interface="lo0" +log_level="DEBUG" +address="::1" \ No newline at end of file diff --git a/server/config.rs b/server/config.rs new file mode 100644 index 0000000..537d616 --- /dev/null +++ b/server/config.rs @@ -0,0 +1,177 @@ +mod cmdline; +mod config_file; + +use self::cmdline::Arguments; +use self::config_file::ConfigFile; +use clap::Parser; +use if_addrs::IfAddr; +use std::io; +use std::net::{AddrParseError, IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::str::FromStr; +use thiserror::Error; +use tracing::metadata::LevelFilter; +use xdg::BaseDirectoriesError; + +#[derive(Debug, Error)] +pub enum ConfigError { + #[error(transparent)] + CommandLineError(#[from] clap::Error), + #[error("Error querying XDG base directories: {0}")] + XdgError(#[from] BaseDirectoriesError), + #[error(transparent)] + IOError(#[from] io::Error), + #[error("TOML processing error: {0}")] + TomlError(#[from] toml::de::Error), + #[error("Server '{0}' specifies an interface ({1}) with no addresses")] + NoAddressForInterface(String, String), + #[error("Server '{0}' specifies an address we couldn't parse: {1}")] + AddressParseError(String, AddrParseError), +} + +#[derive(Debug)] +pub struct Config { + pub log_level: LevelFilter, + pub server_definitions: Vec, +} + +#[derive(Debug)] +pub struct ServerDefinition { + pub name: String, + pub start: bool, + pub interface: Option, + pub address: SocketAddr, + pub log_level: LevelFilter, +} + +impl Config { + /// Generate a configuration by reading the command line arguments and any + /// defined config file, generating the actual arguments that we'll use for + /// operating the daemon. + pub fn derive() -> Result { + let command_line = Arguments::try_parse()?; + let mut config_file = ConfigFile::read(command_line.config_file)?; + let nic_addresses = if_addrs::get_if_addrs()?; + + let log_level = command_line + .log_level + .or(config_file.log_level) + .unwrap_or(LevelFilter::ERROR); + + let mut server_definitions = Vec::new(); + let servers_to_start: Vec = config_file + .start_servers + .map(|x| x.split(',').map(|v| v.to_string()).collect()) + .unwrap_or_default(); + + for (name, config_info) in config_file.servers.drain() { + let start = servers_to_start.contains(&name); + let log_level = config_info.log_level.unwrap_or(log_level); + let port = config_info.port.unwrap_or(1080); + let mut interface = None; + let mut address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)); + + match (config_info.interface, config_info.address) { + // if the user provides us nothing, we'll just use a blank address and + // no interface association + (None, None) => {} + + // if the user provides us an interface but no address, we'll see if we can + // find the interface and pull a reasonable address from it. + (Some(given_interface), None) => { + let mut found_it = false; + + for card_interface in nic_addresses.iter() { + if card_interface.name == given_interface { + interface = Some(given_interface.clone()); + address = SocketAddr::new(addr_convert(&card_interface.addr), port); + found_it = true; + break; + } + } + + if !found_it { + return Err(ConfigError::NoAddressForInterface(name, given_interface)); + } + } + + // if the user provides us an address but no interface, we'll quickly see if + // we can find that address in our interface list ... but we won't insist on + // it. + (None, Some(address_string)) => { + let read_address = IpAddr::from_str(&address_string) + .map_err(|x| ConfigError::AddressParseError(name.clone(), x))?; + + interface = None; + address = SocketAddr::new(read_address, port); + for card_interface in nic_addresses.iter() { + if addrs_match(&card_interface.addr, &read_address) { + interface = Some(card_interface.name.clone()); + break; + } + } + } + + // if the user provides both, we'll check to make sure that they match. + (Some(given_interface), Some(address_string)) => { + let read_address = IpAddr::from_str(&address_string) + .map_err(|x| ConfigError::AddressParseError(name.clone(), x))?; + let mut inferred_interface = None; + let mut good_to_go = false; + + address = SocketAddr::new(read_address, port); + for card_interface in nic_addresses.iter() { + if addrs_match(&card_interface.addr, &read_address) { + if card_interface.name == given_interface { + interface = Some(given_interface.clone()); + good_to_go = true; + break; + } else { + inferred_interface = Some(card_interface.name.clone()); + } + } + } + + if !good_to_go { + if let Some(inferred_interface) = inferred_interface { + tracing::warn!("Address {} is associated with interface {}, not {}; using it instead", read_address, inferred_interface, given_interface); + } else { + tracing::warn!( + "Address {} is not associated with interface {}, or any interface.", + read_address, + given_interface + ); + } + } + } + } + + server_definitions.push(ServerDefinition { + name, + start, + interface, + address, + log_level, + }); + } + + Ok(Config { + log_level, + server_definitions, + }) + } +} + +fn addr_convert(x: &if_addrs::IfAddr) -> IpAddr { + match x { + if_addrs::IfAddr::V4(x) => IpAddr::V4(x.ip), + if_addrs::IfAddr::V6(x) => IpAddr::V6(x.ip), + } +} + +fn addrs_match(x: &if_addrs::IfAddr, y: &IpAddr) -> bool { + match (x, y) { + (IfAddr::V4(x), IpAddr::V4(y)) => &x.ip == y, + (IfAddr::V6(x), IpAddr::V6(y)) => &x.ip == y, + _ => false, + } +} diff --git a/server/config/cmdline.rs b/server/config/cmdline.rs new file mode 100644 index 0000000..37db0e9 --- /dev/null +++ b/server/config/cmdline.rs @@ -0,0 +1,35 @@ +use clap::Parser; +use std::path::PathBuf; +use tracing::metadata::LevelFilter; + +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Arguments { + #[clap( + short, + long, + help = "Use the given config file, rather than $XDG_CONFIG_DIR/socks5.toml" + )] + pub config_file: Option, + + #[clap( + short, + long, + help = "Default logging to the given level. (Defaults to ERROR if not given)" + )] + pub log_level: Option, + + #[clap( + short, + long, + help = "Start only the named server(s) from the config file. For more than one, use comma-separated values or multiple instances of --start" + )] + pub start: Vec, + + #[clap( + short, + long = "validate", + help = "Do not actually start any servers; just validate the config file." + )] + pub validate_only: bool, +} diff --git a/server/config/config_file.rs b/server/config/config_file.rs new file mode 100644 index 0000000..0d8f5ad --- /dev/null +++ b/server/config/config_file.rs @@ -0,0 +1,62 @@ +use super::ConfigError; +use serde::{Deserialize, Deserializer}; +use std::collections::HashMap; +use std::fs; +use std::path::PathBuf; +use tracing::metadata::LevelFilter; +use xdg::BaseDirectories; + +#[derive(serde_derive::Deserialize, Default)] +pub struct ConfigFile { + #[serde(deserialize_with = "parse_log_level")] + pub log_level: Option, + pub start_servers: Option, + #[serde(flatten)] + pub servers: HashMap, +} + +#[derive(serde_derive::Deserialize)] +pub struct ServerDefinition { + pub interface: Option, + #[serde(deserialize_with = "parse_log_level")] + pub log_level: Option, + pub address: Option, + pub port: Option, +} + +fn parse_log_level<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let possible_string: Option<&str> = Deserialize::deserialize(deserializer)?; + + if let Some(s) = possible_string { + Ok(Some(s.parse().map_err(|e| { + serde::de::Error::custom(format!("Couldn't parse log level '{}': {}", s, e)) + })?)) + } else { + Ok(None) + } +} + +impl ConfigFile { + pub fn read(mut config_file_path: Option) -> Result { + if config_file_path.is_none() { + let base_dirs = BaseDirectories::with_prefix("socks5")?; + let proposed_path = base_dirs.get_config_home(); + if let Ok(attributes) = fs::metadata(proposed_path.clone()) { + if attributes.is_file() { + config_file_path = Some(proposed_path); + } + } + } + + match config_file_path { + None => Ok(ConfigFile::default()), + Some(path) => { + let content = fs::read(path)?; + Ok(toml::from_slice(&content)?) + } + } + } +} diff --git a/server/main.rs b/server/main.rs new file mode 100644 index 0000000..1c84c23 --- /dev/null +++ b/server/main.rs @@ -0,0 +1,74 @@ +mod config; + +use async_socks5::server::{SOCKSv5Server, SecurityParameters}; +use config::Config; +use tracing::Instrument; +use tracing_subscriber::filter::EnvFilter; +use tracing_subscriber::fmt; +use tracing_subscriber::prelude::*; + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let config = Config::derive()?; + + let fmt_layer = fmt::layer().with_target(false); + let filter_layer = EnvFilter::builder() + .with_default_directive(config.log_level.into()) + .from_env()?; + + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .init(); + + tracing::trace!("Parsed configuration: {:?}", config); + + let core_server = SOCKSv5Server::new(SecurityParameters { + allow_unauthenticated: true, + allow_connection: None, + check_password: None, + connect_tls: None, + }); + + let mut running_servers = vec![]; + + for server_def in config.server_definitions { + let span = tracing::trace_span!( + "", + server_name = %server_def.name, + interface = ?server_def.interface, + address = %server_def.address, + ); + + let result = core_server + .start(server_def.address.ip(), server_def.address.port()) + .instrument(span) + .await; + + match result { + Ok(x) => running_servers.push(x), + Err(e) => tracing::error!( + server = %server_def.name, + interface = ?server_def.interface, + address = %server_def.address, + "Failure in launching server: {}", + e + ), + } + } + + while !running_servers.is_empty() { + let (initial_result, _idx, next_runners) = + futures::future::select_all(running_servers).await; + + match initial_result { + Ok(Ok(())) => tracing::info!("server completed successfully"), + Ok(Err(e)) => tracing::error!("error in running server: {}", e), + Err(e) => tracing::error!("error joining server: {}", e), + } + + running_servers = next_runners; + } + + Ok(()) +} diff --git a/src/bin/socks-server.rs b/src/bin/socks-server.rs deleted file mode 100644 index c1e5c88..0000000 --- a/src/bin/socks-server.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() -> Result<(), ()> { - Ok(()) -} diff --git a/src/server.rs b/src/server.rs index ed7a521..2c8d533 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,3 @@ -use std::net::SocketAddr; - use crate::address::SOCKSv5Address; use crate::messages::{ AuthenticationMethod, ClientConnectionCommand, ClientConnectionCommandReadError, @@ -8,14 +6,24 @@ use crate::messages::{ ServerAuthResponse, ServerAuthResponseWriteError, ServerChoice, ServerChoiceWriteError, ServerResponse, ServerResponseStatus, ServerResponseWriteError, }; -use crate::security_parameters::SecurityParameters; +pub use crate::security_parameters::SecurityParameters; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use thiserror::Error; use tokio::io::{copy_bidirectional, AsyncWriteExt}; use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket}; +use tokio::task::JoinHandle; +use tracing::{field, info_span, Instrument, Span}; #[derive(Clone)] pub struct SOCKSv5Server { + info: Arc, +} + +struct ServerInfo { security_parameters: SecurityParameters, + next_id: AtomicU64, } #[derive(Clone, Debug, Error, PartialEq)] @@ -54,7 +62,10 @@ impl SOCKSv5Server { /// be synced across all the instances. pub fn new(security_parameters: SecurityParameters) -> Self { SOCKSv5Server { - security_parameters, + info: Arc::new(ServerInfo { + security_parameters, + next_id: AtomicU64::new(1), + }), } } @@ -70,7 +81,7 @@ impl SOCKSv5Server { &self, addr: A, port: u16, - ) -> Result<(), std::io::Error> { + ) -> Result>, std::io::Error> { let listener = match addr.into() { SOCKSv5Address::IP4(x) => TcpListener::bind((x, port)).await?, SOCKSv5Address::IP6(x) => TcpListener::bind((x, port)).await?, @@ -86,45 +97,52 @@ impl SOCKSv5Server { let second_life = self.clone(); - tokio::task::spawn(async move { - if let Err(e) = second_life.server_loop(listener).await { - tracing::error!( - "{}:{}: server network error: {}", - sockaddr.ip(), - sockaddr.port(), - e - ); - } - }); - - Ok(()) + Ok(tokio::task::spawn(async move { + second_life.server_loop(listener).await + })) } /// Run the server loop for a particular listener. This routine will never actually /// return except in error conditions. async fn server_loop(self, listener: TcpListener) -> Result<(), std::io::Error> { + let local_addr = listener.local_addr()?; loop { let (socket, their_addr) = listener.accept().await?; + let accepted_span = info_span!( + "session", + server_address=?local_addr, + remote_address=?their_addr, + auth_method=field::Empty, + ident=field::Empty, + ); - // before we do anything of note, make sure this connection is cool. we don't want - // to waste any resources (and certainly don't want to handle any data!) if this - // isn't someone we want to accept connections from. - tracing::trace!("Initial accept of connection from {}", their_addr); - if let Some(checker) = self.security_parameters.allow_connection { - if !checker(&their_addr) { - tracing::info!("Rejecting attempted connection from {}", their_addr,); - } - continue; - } - - // continue this work in another task. we could absolutely do this work here, - // but just in case someone starts doing slow responses (or other nasty things), - // we want to make sure that that doesn't slow down our ability to accept other - // requests. - let me_again = self.clone(); - tokio::task::spawn(async move { - if let Err(e) = me_again.start_authentication(their_addr, socket).await { - tracing::error!("{}: server handler failure: {}", their_addr, e); + accepted_span.in_scope(|| { + // before we do anything of note, make sure this connection is cool. we don't want + // to waste any resources (and certainly don't want to handle any data!) if this + // isn't someone we want to accept connections from. + if let Some(checker) = self.info.security_parameters.allow_connection { + if !checker(&their_addr) { + tracing::info!("Rejecting attempted connection from {}", their_addr,); + } + } else { + // continue this work in another task. we could absolutely do this work here, + // but just in case someone starts doing slow responses (or other nasty things), + // we want to make sure that that doesn't slow down our ability to accept other + // requests. + let span_again = accepted_span.clone(); + let me_again = self.clone(); + tokio::task::spawn(async move { + let session_identifier = + me_again.info.next_id.fetch_add(1, Ordering::SeqCst); + span_again.record("ident", &session_identifier); + if let Err(e) = me_again + .start_authentication(their_addr, socket) + .instrument(span_again) + .await + { + tracing::error!("{}: server handler failure: {}", their_addr, e); + } + }); } }); } @@ -139,8 +157,10 @@ impl SOCKSv5Server { ) -> Result<(), SOCKSv5ServerError> { let greeting = ClientGreeting::read(&mut socket).await?; - match choose_authentication_method(&self.security_parameters, &greeting.acceptable_methods) - { + match choose_authentication_method( + &self.info.security_parameters, + &greeting.acceptable_methods, + ) { // it's not us, it's you. (we're just going to say no.) None => { tracing::trace!( @@ -157,6 +177,7 @@ impl SOCKSv5Server { // the gold standard. great choice. Some(ChosenMethod::TLS(_converter)) => { + Span::current().record("auth_method", &"TLS"); unimplemented!() } @@ -176,6 +197,7 @@ impl SOCKSv5Server { let its_all_good = ServerAuthResponse::success(); its_all_good.write(&mut socket).await?; socket.flush().await?; + Span::current().record("auth_method", &"password"); self.choose_mode(socket, their_addr).await } else { let yeah_no = ServerAuthResponse::failure(); @@ -189,13 +211,10 @@ impl SOCKSv5Server { // Um. I guess we're doing this unchecked. Yay? Some(ChosenMethod::None) => { - tracing::trace!( - "{}: Just skipping the whole authentication thing.", - their_addr, - ); let nothin_i_guess = ServerChoice::option(AuthenticationMethod::None); nothin_i_guess.write(&mut socket).await?; socket.flush().await?; + Span::current().record("auth_method", &"unauthenticated"); self.choose_mode(socket, their_addr).await } } @@ -214,7 +233,7 @@ impl SOCKSv5Server { self.handle_udp_request(socket, their_addr, ccr).await? } ClientConnectionCommand::EstablishTCPStream => { - self.handle_tcp_request(socket, their_addr, ccr).await? + self.handle_tcp_request(socket, ccr).await? } ClientConnectionCommand::EstablishTCPPortBinding => { self.handle_tcp_binding_request(socket, their_addr, ccr) @@ -272,15 +291,11 @@ impl SOCKSv5Server { async fn handle_tcp_request( self, mut stream: TcpStream, - their_addr: SocketAddr, ccr: ClientConnectionRequest, ) -> Result<(), SOCKSv5ServerError> { // Let the user know that we're maybe making progress - let my_addr = stream.local_addr()?; tracing::info!( - "[{}] Handling TCP forward request from {}, seeking to connect to {}:{}", - my_addr, - their_addr, + "Handling TCP forward request to {}:{}", ccr.destination_address, ccr.destination_port ); @@ -295,10 +310,10 @@ impl SOCKSv5Server { } }; - tracing::trace!( - "Connection established to {}:{}", - ccr.destination_address, - ccr.destination_port + let tcp_forwarding_span = info_span!( + "tcp_forwarding", + target_address=?ccr.destination_address, + target_port=ccr.destination_port ); // Now, for whatever reason -- and this whole thing sent me down a garden path @@ -310,10 +325,15 @@ impl SOCKSv5Server { bound_address: bound_address.ip().into(), bound_port: bound_address.port(), }; - response.write(&mut stream).await?; + response + .write(&mut stream) + .instrument(tcp_forwarding_span.clone()) + .await?; // so now tie our streams together, and we're good to go - tie_streams(stream, outgoing_stream).await; + tie_streams(stream, outgoing_stream) + .instrument(tcp_forwarding_span) + .await; Ok(()) } @@ -369,47 +389,22 @@ impl SOCKSv5Server { } async fn tie_streams(mut left: TcpStream, mut right: TcpStream) { - let left_local_addr = left - .local_addr() - .expect("couldn't get left local address in tie_streams"); - let left_peer_addr = left - .peer_addr() - .expect("couldn't get left peer address in tie_streams"); - let right_local_addr = right - .local_addr() - .expect("couldn't get right local address in tie_streams"); - let right_peer_addr = right - .peer_addr() - .expect("couldn't get right peer address in tie_streams"); - - tokio::task::spawn(async move { - tracing::info!( - "Setting up linkage {}/{} <-> {}/{}", - left_peer_addr, - left_local_addr, - right_local_addr, - right_peer_addr - ); - match copy_bidirectional(&mut left, &mut right).await { - Ok((l2r, r2l)) => tracing::info!( - "Shutting down linkage {}/{} <-> {}/{} (sent {} and {} bytes, respectively)", - left_peer_addr, - left_local_addr, - right_local_addr, - right_peer_addr, - l2r, - r2l - ), - Err(e) => tracing::warn!( - "Shutting down linkage {}/{} <-> {}/{} with error: {}", - left_peer_addr, - left_local_addr, - right_local_addr, - right_peer_addr, - e - ), + let span_copy = Span::current(); + tracing::info!("linking forwarding streams"); + tokio::task::spawn( + async move { + match copy_bidirectional(&mut left, &mut right) + .instrument(span_copy) + .await + { + Ok((l2r, r2l)) => { + tracing::info!(sent = l2r, received = r2l, "shutting down streams") + } + Err(e) => tracing::warn!("Linked streams shut down with error: {}", e), + } } - }); + .instrument(Span::current()), + ); } #[allow(clippy::upper_case_acronyms)]