First attempt at implementing remote TCP port binding.

This commit is contained in:
2021-12-20 20:40:25 -08:00
parent 3737d0739d
commit bac2c33aee
4 changed files with 184 additions and 44 deletions

View File

@@ -11,6 +11,7 @@ use crate::network::SOCKSv5Address;
use async_std::io;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use futures::Future;
use log::{info, trace, warn};
use std::fmt::{Debug, Display};
use thiserror::Error;
@@ -55,7 +56,20 @@ pub struct LoginInfo {
pub username_password: Option<UsernamePassword>,
}
impl Default for LoginInfo {
fn default() -> Self {
Self::new()
}
}
impl LoginInfo {
/// Generate an empty bit of login information.
fn new() -> LoginInfo {
LoginInfo {
username_password: None,
}
}
/// Turn this information into a list of authentication methods that we can handle,
/// to send to the server. The RFC isn't super clear if the order of these matters
/// at all, but we'll try to keep it in our preferred order.
@@ -179,15 +193,52 @@ where
/// person to listen on. So this function takes an async function, which it
/// will pass this information to once it has it. It's up to that function,
/// then, to communicate this to its peer.
pub async fn remote_listen<A, E: Debug + Display>(
pub async fn remote_listen<A, Fut: Future<Output = Result<(), SOCKSv5Error<N::Error>>>>(
self,
_addr: A,
_port: u16,
) -> Result<GenericStream, SOCKSv5Error<E>>
addr: A,
port: u16,
callback: impl FnOnce(SOCKSv5Address, u16) -> Fut,
) -> Result<(SOCKSv5Address, u16, GenericStream), SOCKSv5Error<N::Error>>
where
A: Into<SOCKSv5Address>,
{
unimplemented!()
let mut stream = self.start_session().await?;
let target = addr.into();
let ccr = ClientConnectionRequest {
command_code: ClientConnectionCommand::EstablishTCPPortBinding,
destination_address: target.clone(),
destination_port: port,
};
ccr.write(&mut stream).await?;
let initial_response = ServerResponse::read(&mut stream).await?;
if initial_response.status != ServerResponseStatus::RequestGranted {
return Err(initial_response.status.into());
}
info!(
"Proxy port binding of {}:{} established; server listening on {}:{}",
target, port, initial_response.bound_address, initial_response.bound_port
);
callback(initial_response.bound_address, initial_response.bound_port).await?;
let secondary_response = ServerResponse::read(&mut stream).await?;
if secondary_response.status != ServerResponseStatus::RequestGranted {
return Err(secondary_response.status.into());
}
info!(
"Proxy bind got a connection from {}:{}",
secondary_response.bound_address, secondary_response.bound_port
);
Ok((
secondary_response.bound_address,
secondary_response.bound_port,
stream,
))
}
}

View File

@@ -12,6 +12,7 @@ mod test {
use crate::network::listener::Listenerlike;
use crate::network::testing::TestingStack;
use crate::server::{SOCKSv5Server, SecurityParameters};
use async_std::channel::bounded;
use async_std::io::prelude::WriteExt;
use async_std::task;
use futures::AsyncReadExt;
@@ -149,4 +150,43 @@ mod test {
assert_eq!(read_buffer, [1, 3, 3, 7]);
})
}
#[test]
fn bind_test() {
task::block_on(async {
let mut network_stack = TestingStack::default();
let security_parameters = SecurityParameters::unrestricted();
let server = SOCKSv5Server::new(network_stack.clone(), security_parameters);
server.start("localhost", 9994).await.unwrap();
let login_info = LoginInfo::default();
let client = SOCKSv5Client::new(network_stack.clone(), login_info, "localhost", 9994)
.await
.unwrap();
let (target_sender, target_receiver) = bounded(1);
task::spawn(async move {
let (_, _, mut conn) = client
.remote_listen("localhost", 9993, |addr, port| async move {
target_sender.send((addr, port)).await.unwrap();
Ok(())
})
.await
.unwrap();
conn.write_all(&[2, 3, 5, 7]).await.unwrap();
});
let (target_addr, target_port) = target_receiver.recv().await.unwrap();
let mut stream = network_stack
.connect(target_addr, target_port)
.await
.unwrap();
let mut read_buffer = [0; 4];
stream.read_exact(&mut read_buffer).await.unwrap();
assert_eq!(read_buffer, [2, 3, 5, 7]);
})
}
}

View File

@@ -9,6 +9,7 @@ use async_std::task;
#[cfg(test)]
use futures::io::Cursor;
use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use log::debug;
#[cfg(test)]
use quickcheck::{quickcheck, Arbitrary, Gen};
#[cfg(test)]
@@ -34,8 +35,9 @@ impl ClientConnectionRequest {
) -> Result<Self, DeserializationError> {
let mut buffer = [0; 3];
debug!("Starting to read request.");
read_amt(r, 3, &mut buffer).await?;
debug!("Read three opening bytes: {:?}", buffer);
if buffer[0] != 5 {
return Err(DeserializationError::InvalidVersion(5, buffer[0]));
}
@@ -46,15 +48,18 @@ impl ClientConnectionRequest {
0x03 => ClientConnectionCommand::AssociateUDPPort,
x => return Err(DeserializationError::InvalidClientCommand(x)),
};
debug!("Command code: {:?}", command_code);
if buffer[2] != 0 {
return Err(DeserializationError::InvalidReservedByte(buffer[2]));
}
let destination_address = SOCKSv5Address::read(r).await?;
debug!("Destination address: {}", destination_address);
read_amt(r, 2, &mut buffer).await?;
let destination_port = ((buffer[0] as u16) << 8) + (buffer[1] as u16);
debug!("Destination port: {}", destination_port);
Ok(ClientConnectionRequest {
command_code,

View File

@@ -360,6 +360,76 @@ impl<N: Networklike + Clone + Send + 'static> SOCKSv5Server<N> {
};
response.write(&mut stream).await?;
// so now tie our streams together, and we're good to go
tie_streams(
format!("{}:{}", their_addr, their_port),
stream,
format!("{}:{}", ccr.destination_address, ccr.destination_port),
outgoing_stream,
)
.await;
Ok(())
}
async fn handle_tcp_bind(
self,
mut stream: GenericStream,
ccr: ClientConnectionRequest,
their_addr: SOCKSv5Address,
their_port: u16,
) -> Result<(), ServerError<N::Error>> {
// Let the user know that we're maybe making progress
let (my_addr, my_port) = stream.local_addr();
info!(
"[{}:{}] Handling TCP bind request from {}:{}, seeking to bind {}:{}",
my_addr, my_port, their_addr, their_port, ccr.destination_address, ccr.destination_port
);
// OK, we have to bind the darn socket first.
let port_binding = {
let mut network = self.network.lock().await;
network.listen(their_addr.clone(), their_port).await
}
.map_err(ServerError::NetworkError)?;
// Tell them what we bound, just in case they want to inform anyone.
let (bound_address, bound_port) = port_binding.local_addr();
let response = ServerResponse {
status: ServerResponseStatus::RequestGranted,
bound_address,
bound_port,
};
response.write(&mut stream).await?;
// Wait politely for someone to talk to us.
let (other, other_addr, other_port) = port_binding
.accept()
.await
.map_err(ServerError::NetworkError)?;
let info = ServerResponse {
status: ServerResponseStatus::RequestGranted,
bound_address: other_addr.clone(),
bound_port: other_port,
};
info.write(&mut stream).await?;
tie_streams(
format!("{}:{}", their_addr, their_port),
stream,
format!("{}:{}", other_addr, other_port),
other,
)
.await;
Ok(())
}
}
async fn tie_streams(
left_name: String,
left: GenericStream,
right_name: String,
right: GenericStream,
) {
// Now that we've informed them of that, we set up one task to transfer information
// from the current stream (`stream`) to the connection (`outgoing_stream`), and
// another task that goes in the reverse direction.
@@ -368,52 +438,26 @@ impl<N: Networklike + Clone + Send + 'static> SOCKSv5Server<N> {
// this is the right approach. My only rationale is that this might let some
// memory we might have accumulated along the way drop more easily, but that
// might not actually matter.
let mut from_left = stream.clone();
let mut from_right = outgoing_stream.clone();
let mut to_left = stream;
let mut to_right = outgoing_stream;
let from = format!("{}:{}", their_addr, their_port);
let to = format!("{}:{}", ccr.destination_address, ccr.destination_port);
let mut from_left = left.clone();
let mut from_right = right.clone();
let mut to_left = left;
let mut to_right = right;
let left_right_name = format!("{} >--> {}", left_name, right_name);
let right_left_name = format!("{} <--< {}", left_name, right_name);
task::spawn(async move {
info!(
"Spawned {}:{} >--> {}:{} task",
their_addr, their_port, ccr.destination_address, ccr.destination_port
);
info!("Spawned {} task", left_right_name);
if let Err(e) = io::copy(&mut from_left, &mut to_right).await {
warn!(
"{}:{} >--> {}:{} connection failed with: {}",
their_addr, their_port, ccr.destination_address, ccr.destination_port, e
);
warn!("{} connection failed with: {}", left_right_name, e);
}
});
task::spawn(async move {
info!("Spawned {} <--< {} task", from, to);
info!("Spawned {} task", right_left_name);
if let Err(e) = io::copy(&mut from_right, &mut to_left).await {
warn!("{} <--< {} connection failed with: {}", from, to, e);
warn!("{} connection failed with: {}", right_left_name, e);
}
});
Ok(())
}
async fn handle_tcp_bind(
self,
stream: GenericStream,
ccr: ClientConnectionRequest,
their_addr: SOCKSv5Address,
their_port: u16,
) -> Result<(), ServerError<N::Error>> {
// Let the user know that we're maybe making progress
let (my_addr, my_port) = stream.local_addr();
info!(
"[{}:{}] Handling UDP bind request from {}:{}, seeking to bind {}:{}",
my_addr, my_port, their_addr, their_port, ccr.destination_address, ccr.destination_port
);
unimplemented!()
}
}
#[allow(clippy::upper_case_acronyms)]