diff --git a/src/client.rs b/src/client.rs index 6003c3e..3836295 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,6 +11,7 @@ use crate::network::SOCKSv5Address; use async_std::io; use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; +use futures::Future; use log::{info, trace, warn}; use std::fmt::{Debug, Display}; use thiserror::Error; @@ -55,7 +56,20 @@ pub struct LoginInfo { pub username_password: Option, } +impl Default for LoginInfo { + fn default() -> Self { + Self::new() + } +} + impl LoginInfo { + /// Generate an empty bit of login information. + fn new() -> LoginInfo { + LoginInfo { + username_password: None, + } + } + /// Turn this information into a list of authentication methods that we can handle, /// to send to the server. The RFC isn't super clear if the order of these matters /// at all, but we'll try to keep it in our preferred order. @@ -179,15 +193,52 @@ where /// person to listen on. So this function takes an async function, which it /// will pass this information to once it has it. It's up to that function, /// then, to communicate this to its peer. - pub async fn remote_listen( + pub async fn remote_listen>>>( self, - _addr: A, - _port: u16, - ) -> Result> + addr: A, + port: u16, + callback: impl FnOnce(SOCKSv5Address, u16) -> Fut, + ) -> Result<(SOCKSv5Address, u16, GenericStream), SOCKSv5Error> where A: Into, { - unimplemented!() + let mut stream = self.start_session().await?; + let target = addr.into(); + let ccr = ClientConnectionRequest { + command_code: ClientConnectionCommand::EstablishTCPPortBinding, + destination_address: target.clone(), + destination_port: port, + }; + + ccr.write(&mut stream).await?; + + let initial_response = ServerResponse::read(&mut stream).await?; + if initial_response.status != ServerResponseStatus::RequestGranted { + return Err(initial_response.status.into()); + } + + info!( + "Proxy port binding of {}:{} established; server listening on {}:{}", + target, port, initial_response.bound_address, initial_response.bound_port + ); + + callback(initial_response.bound_address, initial_response.bound_port).await?; + + let secondary_response = ServerResponse::read(&mut stream).await?; + if secondary_response.status != ServerResponseStatus::RequestGranted { + return Err(secondary_response.status.into()); + } + + info!( + "Proxy bind got a connection from {}:{}", + secondary_response.bound_address, secondary_response.bound_port + ); + + Ok(( + secondary_response.bound_address, + secondary_response.bound_port, + stream, + )) } } diff --git a/src/lib.rs b/src/lib.rs index 6482794..66bdf88 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ mod test { use crate::network::listener::Listenerlike; use crate::network::testing::TestingStack; use crate::server::{SOCKSv5Server, SecurityParameters}; + use async_std::channel::bounded; use async_std::io::prelude::WriteExt; use async_std::task; use futures::AsyncReadExt; @@ -149,4 +150,43 @@ mod test { assert_eq!(read_buffer, [1, 3, 3, 7]); }) } + + #[test] + fn bind_test() { + task::block_on(async { + let mut network_stack = TestingStack::default(); + + let security_parameters = SecurityParameters::unrestricted(); + let server = SOCKSv5Server::new(network_stack.clone(), security_parameters); + server.start("localhost", 9994).await.unwrap(); + + let login_info = LoginInfo::default(); + let client = SOCKSv5Client::new(network_stack.clone(), login_info, "localhost", 9994) + .await + .unwrap(); + + let (target_sender, target_receiver) = bounded(1); + + task::spawn(async move { + let (_, _, mut conn) = client + .remote_listen("localhost", 9993, |addr, port| async move { + target_sender.send((addr, port)).await.unwrap(); + Ok(()) + }) + .await + .unwrap(); + + conn.write_all(&[2, 3, 5, 7]).await.unwrap(); + }); + + let (target_addr, target_port) = target_receiver.recv().await.unwrap(); + let mut stream = network_stack + .connect(target_addr, target_port) + .await + .unwrap(); + let mut read_buffer = [0; 4]; + stream.read_exact(&mut read_buffer).await.unwrap(); + assert_eq!(read_buffer, [2, 3, 5, 7]); + }) + } } diff --git a/src/messages/client_command.rs b/src/messages/client_command.rs index 44ac669..e1ddb63 100644 --- a/src/messages/client_command.rs +++ b/src/messages/client_command.rs @@ -9,6 +9,7 @@ use async_std::task; #[cfg(test)] use futures::io::Cursor; use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use log::debug; #[cfg(test)] use quickcheck::{quickcheck, Arbitrary, Gen}; #[cfg(test)] @@ -34,8 +35,9 @@ impl ClientConnectionRequest { ) -> Result { let mut buffer = [0; 3]; + debug!("Starting to read request."); read_amt(r, 3, &mut buffer).await?; - + debug!("Read three opening bytes: {:?}", buffer); if buffer[0] != 5 { return Err(DeserializationError::InvalidVersion(5, buffer[0])); } @@ -46,15 +48,18 @@ impl ClientConnectionRequest { 0x03 => ClientConnectionCommand::AssociateUDPPort, x => return Err(DeserializationError::InvalidClientCommand(x)), }; + debug!("Command code: {:?}", command_code); if buffer[2] != 0 { return Err(DeserializationError::InvalidReservedByte(buffer[2])); } let destination_address = SOCKSv5Address::read(r).await?; + debug!("Destination address: {}", destination_address); read_amt(r, 2, &mut buffer).await?; let destination_port = ((buffer[0] as u16) << 8) + (buffer[1] as u16); + debug!("Destination port: {}", destination_port); Ok(ClientConnectionRequest { command_code, diff --git a/src/server.rs b/src/server.rs index c442e26..0976a6f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -360,47 +360,20 @@ impl SOCKSv5Server { }; response.write(&mut stream).await?; - // Now that we've informed them of that, we set up one task to transfer information - // from the current stream (`stream`) to the connection (`outgoing_stream`), and - // another task that goes in the reverse direction. - // - // I've chosen to start two fresh tasks and let this one die; I'm not sure that - // this is the right approach. My only rationale is that this might let some - // memory we might have accumulated along the way drop more easily, but that - // might not actually matter. - let mut from_left = stream.clone(); - let mut from_right = outgoing_stream.clone(); - let mut to_left = stream; - let mut to_right = outgoing_stream; - let from = format!("{}:{}", their_addr, their_port); - let to = format!("{}:{}", ccr.destination_address, ccr.destination_port); - - task::spawn(async move { - info!( - "Spawned {}:{} >--> {}:{} task", - their_addr, their_port, ccr.destination_address, ccr.destination_port - ); - if let Err(e) = io::copy(&mut from_left, &mut to_right).await { - warn!( - "{}:{} >--> {}:{} connection failed with: {}", - their_addr, their_port, ccr.destination_address, ccr.destination_port, e - ); - } - }); - - task::spawn(async move { - info!("Spawned {} <--< {} task", from, to); - if let Err(e) = io::copy(&mut from_right, &mut to_left).await { - warn!("{} <--< {} connection failed with: {}", from, to, e); - } - }); - + // so now tie our streams together, and we're good to go + tie_streams( + format!("{}:{}", their_addr, their_port), + stream, + format!("{}:{}", ccr.destination_address, ccr.destination_port), + outgoing_stream, + ) + .await; Ok(()) } async fn handle_tcp_bind( self, - stream: GenericStream, + mut stream: GenericStream, ccr: ClientConnectionRequest, their_addr: SOCKSv5Address, their_port: u16, @@ -408,14 +381,85 @@ impl SOCKSv5Server { // Let the user know that we're maybe making progress let (my_addr, my_port) = stream.local_addr(); info!( - "[{}:{}] Handling UDP bind request from {}:{}, seeking to bind {}:{}", + "[{}:{}] Handling TCP bind request from {}:{}, seeking to bind {}:{}", my_addr, my_port, their_addr, their_port, ccr.destination_address, ccr.destination_port ); - unimplemented!() + // OK, we have to bind the darn socket first. + let port_binding = { + let mut network = self.network.lock().await; + network.listen(their_addr.clone(), their_port).await + } + .map_err(ServerError::NetworkError)?; + + // Tell them what we bound, just in case they want to inform anyone. + let (bound_address, bound_port) = port_binding.local_addr(); + let response = ServerResponse { + status: ServerResponseStatus::RequestGranted, + bound_address, + bound_port, + }; + response.write(&mut stream).await?; + + // Wait politely for someone to talk to us. + let (other, other_addr, other_port) = port_binding + .accept() + .await + .map_err(ServerError::NetworkError)?; + let info = ServerResponse { + status: ServerResponseStatus::RequestGranted, + bound_address: other_addr.clone(), + bound_port: other_port, + }; + info.write(&mut stream).await?; + + tie_streams( + format!("{}:{}", their_addr, their_port), + stream, + format!("{}:{}", other_addr, other_port), + other, + ) + .await; + Ok(()) } } +async fn tie_streams( + left_name: String, + left: GenericStream, + right_name: String, + right: GenericStream, +) { + // Now that we've informed them of that, we set up one task to transfer information + // from the current stream (`stream`) to the connection (`outgoing_stream`), and + // another task that goes in the reverse direction. + // + // I've chosen to start two fresh tasks and let this one die; I'm not sure that + // this is the right approach. My only rationale is that this might let some + // memory we might have accumulated along the way drop more easily, but that + // might not actually matter. + let mut from_left = left.clone(); + let mut from_right = right.clone(); + let mut to_left = left; + let mut to_right = right; + let left_right_name = format!("{} >--> {}", left_name, right_name); + let right_left_name = format!("{} <--< {}", left_name, right_name); + + task::spawn(async move { + info!("Spawned {} task", left_right_name); + if let Err(e) = io::copy(&mut from_left, &mut to_right).await { + warn!("{} connection failed with: {}", left_right_name, e); + } + }); + + task::spawn(async move { + info!("Spawned {} task", right_left_name); + if let Err(e) = io::copy(&mut from_right, &mut to_left).await { + warn!("{} connection failed with: {}", right_left_name, e); + } + }); +} + #[allow(clippy::upper_case_acronyms)] enum ChosenMethod { TLS(fn(GenericStream) -> Option),