From a2c57e4c76a34dba2490cfc9084341873edcbd69 Mon Sep 17 00:00:00 2001 From: Adam Wick Date: Mon, 4 Oct 2021 15:01:56 -0700 Subject: [PATCH] Add a prototype testing network stack, that requires no actual network connection. --- .gitignore | 3 +- src/network.rs | 1 + src/network/address.rs | 2 +- src/network/testing.rs | 276 ++++++++++++++++++++++++++++++++ src/network/testing/datagram.rs | 88 ++++++++++ src/network/testing/stream.rs | 145 +++++++++++++++++ 6 files changed, 513 insertions(+), 2 deletions(-) create mode 100644 src/network/testing.rs create mode 100644 src/network/testing/datagram.rs create mode 100644 src/network/testing/stream.rs diff --git a/.gitignore b/.gitignore index 53715f4..eb91c1f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target Cargo.lock -tarpaulin-report.html \ No newline at end of file +tarpaulin-report.html +launch.json diff --git a/src/network.rs b/src/network.rs index bc10e44..3a277d0 100644 --- a/src/network.rs +++ b/src/network.rs @@ -4,6 +4,7 @@ pub mod generic; pub mod listener; pub mod standard; pub mod stream; +pub mod testing; use crate::messages::ServerResponseStatus; pub use crate::network::address::SOCKSv5Address; diff --git a/src/network/address.rs b/src/network/address.rs index 3db9788..eeecd6f 100644 --- a/src/network/address.rs +++ b/src/network/address.rs @@ -16,7 +16,7 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::pin::Pin; use thiserror::Error; -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub enum SOCKSv5Address { IP4(Ipv4Addr), IP6(Ipv6Addr), diff --git a/src/network/testing.rs b/src/network/testing.rs new file mode 100644 index 0000000..55aeb78 --- /dev/null +++ b/src/network/testing.rs @@ -0,0 +1,276 @@ +mod datagram; +mod stream; + +use crate::messages::ServerResponseStatus; +use crate::network::address::{HasLocalAddress, SOCKSv5Address}; +#[cfg(test)] +use crate::network::datagram::Datagramlike; +use crate::network::datagram::GenericDatagramSocket; +use crate::network::generic::Networklike; +use crate::network::listener::{GenericListener, Listenerlike}; +use crate::network::stream::GenericStream; +use crate::network::testing::datagram::TestDatagram; +use crate::network::testing::stream::TestingStream; +use async_std::channel::{bounded, Receiver, Sender}; +use async_std::sync::{Arc, Mutex}; +#[cfg(test)] +use async_std::task; +use async_trait::async_trait; +#[cfg(test)] +use futures::{AsyncReadExt, AsyncWriteExt}; +use std::collections::HashMap; +use std::fmt; + +/// A "network", based purely on internal Rust datatypes, for testing +/// networking code. This stack operates purely in memory, so shouldn't +/// suffer from any weird networking effects ... which makes it a good +/// functional test, but not great at actually testing real-world failure +/// modes. +#[allow(clippy::type_complexity)] +#[derive(Clone)] +pub struct TestingStack { + tcp_listeners: Arc>>>, + udp_sockets: Arc)>>>>, + next_random_socket: u16, +} + +impl TestingStack { + pub fn new() -> TestingStack { + TestingStack { + tcp_listeners: Arc::new(Mutex::new(HashMap::new())), + udp_sockets: Arc::new(Mutex::new(HashMap::new())), + next_random_socket: 23, + } + } +} + +impl Default for TestingStack { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug)] +pub enum TestStackError { + AcceptFailed, + AddressBusy(SOCKSv5Address, u16), + ConnectionFailed, + FailureToSend, + NoTCPHostFound(SOCKSv5Address, u16), + ReceiveFailure, +} + +impl fmt::Display for TestStackError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TestStackError::AcceptFailed => write!(f, "Accept failed; the other side died (?)"), + TestStackError::AddressBusy(ref addr, port) => { + write!(f, "Address {}:{} already in use", addr, port) + } + TestStackError::ConnectionFailed => write!(f, "Couldn't connect to host."), + TestStackError::FailureToSend => write!( + f, + "Weird internal error in testing infrastructure; channel send failed" + ), + TestStackError::NoTCPHostFound(ref addr, port) => { + write!(f, "No host found at {} for TCP port {}", addr, port) + } + TestStackError::ReceiveFailure => { + write!(f, "Failed to process a UDP receive (this is weird)") + } + } + } +} + +impl From for ServerResponseStatus { + fn from(_: TestStackError) -> Self { + ServerResponseStatus::GeneralFailure + } +} + +#[async_trait] +impl Networklike for TestingStack { + type Error = TestStackError; + + async fn connect>( + &mut self, + addr: A, + port: u16, + ) -> Result { + let table = self.tcp_listeners.lock().await; + let target = addr.into(); + + match table.get(&(target.clone(), port)) { + None => Err(TestStackError::NoTCPHostFound(target, port)), + Some(result) => { + let stream = TestingStream::new(target, port); + let retval = stream.clone(); + match result.send(stream).await { + Ok(()) => Ok(GenericStream::new(retval)), + Err(_) => Err(TestStackError::FailureToSend), + } + } + } + } + + async fn listen>( + &mut self, + addr: A, + mut port: u16, + ) -> Result, Self::Error> { + let mut table = self.tcp_listeners.lock().await; + let target = addr.into(); + let (sender, receiver) = bounded(5); + + if port == 0 { + port = self.next_random_socket; + self.next_random_socket += 1; + } + + table.insert((target.clone(), port), sender); + Ok(GenericListener { + internal: Box::new(TestListener::new(target, port, receiver)), + }) + } + + async fn bind>( + &mut self, + addr: A, + mut port: u16, + ) -> Result, Self::Error> { + let mut table = self.udp_sockets.lock().await; + let target = addr.into(); + let (sender, receiver) = bounded(5); + + if port == 0 { + port = self.next_random_socket; + self.next_random_socket += 1; + } + + table.insert((target.clone(), port), sender); + Ok(GenericDatagramSocket { + internal: Box::new(TestDatagram::new(self.clone(), target, port, receiver)), + }) + } +} + +struct TestListener { + address: SOCKSv5Address, + port: u16, + receiver: Receiver, +} + +impl TestListener { + fn new(address: SOCKSv5Address, port: u16, receiver: Receiver) -> Self { + TestListener { + address, + port, + receiver, + } + } +} + +impl HasLocalAddress for TestListener { + fn local_addr(&self) -> (SOCKSv5Address, u16) { + (self.address.clone(), self.port) + } +} + +#[async_trait] +impl Listenerlike for TestListener { + type Error = TestStackError; + + async fn accept(&self) -> Result<(GenericStream, SOCKSv5Address, u16), Self::Error> { + match self.receiver.recv().await { + Ok(next) => { + let (addr, port) = next.local_addr(); + Ok((GenericStream::new(next), addr, port)) + } + Err(_) => Err(TestStackError::AcceptFailed), + } + } +} + +#[test] +fn check_sanity() { + task::block_on(async { + // Technically, this is UDP, and UDP is lossy. We're going to assume we're not + // going to get any dropped data along here ... which is a very questionable + // assumption, morally speaking, but probably fine for most purposes. + let mut network = TestingStack::new(); + let receiver = network + .bind("localhost", 0) + .await + .expect("Failed to bind receiver socket."); + let sender = network + .bind("localhost", 0) + .await + .expect("Failed to bind sender socket."); + let buffer = [0xde, 0xea, 0xbe, 0xef]; + let (receiver_addr, receiver_port) = receiver.local_addr(); + sender + .send_to(&buffer, receiver_addr, receiver_port) + .await + .expect("Failure sending datagram!"); + let mut recvbuffer = [0; 4]; + let (s, f, p) = receiver + .recv_from(&mut recvbuffer) + .await + .expect("Didn't receive UDP message?"); + let (sender_addr, sender_port) = sender.local_addr(); + assert_eq!(s, 4); + assert_eq!(f, sender_addr); + assert_eq!(p, sender_port); + assert_eq!(recvbuffer, buffer); + }); + + task::block_on(async { + let mut network = TestingStack::new(); + + let listener = network + .listen("localhost", 0) + .await + .expect("Couldn't set up listener on localhost"); + let (listener_address, listener_port) = listener.local_addr(); + + let listener_task_handle = task::spawn(async move { + dbg!("Starting listener task!!"); + let (mut stream, addr, port) = listener.accept().await.expect("Didn't get connection"); + let mut result_buffer = [0u8; 4]; + if let Err(e) = stream.read_exact(&mut result_buffer).await { + dbg!("Error reading buffer from stream: {}", e); + } else { + dbg!("made it through read_exact"); + } + (result_buffer, addr, port) + }); + + let sender_task_handle = task::spawn(async move { + let mut sender = network + .connect(listener_address, listener_port) + .await + .expect("Coudln't connect to listener?"); + let (sender_address, sender_port) = sender.local_addr(); + let send_buffer = [0xa, 0xff, 0xab, 0x1e]; + sender + .write_all(&send_buffer) + .await + .expect("Couldn't send the write buffer"); + sender + .flush() + .await + .expect("Couldn't flush the write buffer"); + sender + .close() + .await + .expect("Couldn't close the write buffer"); + (sender_address, sender_port) + }); + + let (result, result_from, result_from_port) = listener_task_handle.await; + assert_eq!(result, [0xa, 0xff, 0xab, 0x1e]); + let (sender_address, sender_port) = sender_task_handle.await; + assert_eq!(result_from, sender_address); + assert_eq!(result_from_port, sender_port); + }); +} diff --git a/src/network/testing/datagram.rs b/src/network/testing/datagram.rs new file mode 100644 index 0000000..e274797 --- /dev/null +++ b/src/network/testing/datagram.rs @@ -0,0 +1,88 @@ +use crate::network::address::HasLocalAddress; +use crate::network::datagram::Datagramlike; +use crate::network::testing::{TestStackError, TestingStack}; +use crate::network::SOCKSv5Address; +use async_std::channel::Receiver; +use async_trait::async_trait; +use std::cmp::Ordering; + +pub struct TestDatagram { + context: TestingStack, + my_address: SOCKSv5Address, + my_port: u16, + input_stream: Receiver<(SOCKSv5Address, u16, Vec)>, +} + +impl TestDatagram { + pub fn new( + context: TestingStack, + my_address: SOCKSv5Address, + my_port: u16, + input_stream: Receiver<(SOCKSv5Address, u16, Vec)>, + ) -> Self { + TestDatagram { + context, + my_address, + my_port, + input_stream, + } + } +} + +impl HasLocalAddress for TestDatagram { + fn local_addr(&self) -> (SOCKSv5Address, u16) { + (self.my_address.clone(), self.my_port) + } +} + +#[async_trait] +impl Datagramlike for TestDatagram { + type Error = TestStackError; + + async fn send_to( + &self, + buf: &[u8], + target: SOCKSv5Address, + port: u16, + ) -> Result { + let table = self.context.udp_sockets.lock().await; + match table.get(&(target, port)) { + None => Ok(buf.len()), + Some(sender) => { + sender + .send((self.my_address.clone(), self.my_port, buf.to_vec())) + .await + .map_err(|_| TestStackError::FailureToSend)?; + Ok(buf.len()) + } + } + } + + async fn recv_from( + &self, + buffer: &mut [u8], + ) -> Result<(usize, SOCKSv5Address, u16), Self::Error> { + let (from_addr, from_port, message) = self + .input_stream + .recv() + .await + .map_err(|_| TestStackError::ReceiveFailure)?; + + match message.len().cmp(&buffer.len()) { + Ordering::Greater => { + buffer.copy_from_slice(&message[..buffer.len()]); + Ok((message.len(), from_addr, from_port)) + } + + Ordering::Less => { + (&mut buffer[..message.len()]).copy_from_slice(&message); + Ok((message.len(), from_addr, from_port)) + } + + Ordering::Equal => { + buffer.copy_from_slice(message.as_ref()); + Ok((message.len(), from_addr, from_port)) + } + } + } +} diff --git a/src/network/testing/stream.rs b/src/network/testing/stream.rs new file mode 100644 index 0000000..b926d44 --- /dev/null +++ b/src/network/testing/stream.rs @@ -0,0 +1,145 @@ +use crate::network::address::HasLocalAddress; +use crate::network::stream::Streamlike; +use crate::network::SOCKSv5Address; +use async_std::io; +use async_std::io::{Read, Write}; +use async_std::task::{Context, Poll, Waker}; +use std::cell::UnsafeCell; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicBool, Ordering}; + +#[derive(Clone)] +pub struct TestingStream { + address: SOCKSv5Address, + port: u16, + internals: NonNull, +} + +unsafe impl Send for TestingStream {} +unsafe impl Sync for TestingStream {} + +struct TestingStreamData { + lock: AtomicBool, + waiters: UnsafeCell>, + buffer: UnsafeCell>, +} + +unsafe impl Send for TestingStreamData {} +unsafe impl Sync for TestingStreamData {} + +impl TestingStream { + pub fn new(address: SOCKSv5Address, port: u16) -> TestingStream { + let tsd = TestingStreamData { + lock: AtomicBool::new(false), + waiters: UnsafeCell::new(Vec::new()), + buffer: UnsafeCell::new(Vec::with_capacity(16 * 1024)), + }; + + let boxed_tsd = Box::new(tsd); + let raw_ptr = Box::leak(boxed_tsd); + + TestingStream { + address, + port, + internals: NonNull::new(raw_ptr).unwrap(), + } + } + + pub fn acquire_lock(&mut self) { + loop { + let internals = unsafe { self.internals.as_mut() }; + + match internals + .lock + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + { + Err(_) => continue, + Ok(_) => return, + } + } + } + + pub fn release_lock(&mut self) { + let internals = unsafe { self.internals.as_mut() }; + internals.lock.store(false, Ordering::SeqCst); + } +} + +impl HasLocalAddress for TestingStream { + fn local_addr(&self) -> (SOCKSv5Address, u16) { + (self.address.clone(), self.port) + } +} + +impl Read for TestingStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + // so, we're going to spin here, which is less than ideal but should work fine + // in practice. we'll obviously need to be very careful to ensure that we keep + // the stuff internal to this spin really short. + self.acquire_lock(); + + let internals = unsafe { self.internals.as_mut() }; + let stream_buffer = internals.buffer.get_mut(); + + let amount_available = stream_buffer.len(); + + if amount_available == 0 { + let waker = cx.waker().clone(); + internals.waiters.get_mut().push(waker); + self.release_lock(); + return Poll::Pending; + } + + let amt_written = if buf.len() >= amount_available { + (&mut buf[0..amount_available]).copy_from_slice(stream_buffer); + stream_buffer.clear(); + amount_available + } else { + let amt_to_copy = buf.len(); + buf.copy_from_slice(&stream_buffer[0..amt_to_copy]); + stream_buffer.copy_within(amt_to_copy.., 0); + let amt_left = amount_available - amt_to_copy; + stream_buffer.resize(amt_left, 0); + amt_to_copy + }; + + self.release_lock(); + + Poll::Ready(Ok(amt_written)) + } +} + +impl Write for TestingStream { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.acquire_lock(); + let internals = unsafe { self.internals.as_mut() }; + let stream_buffer = internals.buffer.get_mut(); + + stream_buffer.extend_from_slice(buf); + for waiter in internals.waiters.get_mut().drain(0..) { + waiter.wake(); + } + self.release_lock(); + + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) // FIXME: Might consider having this wait until the buffer is empty + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) // FIXME: Might consider putting in some open/closed logic here + } +} + +impl Streamlike for TestingStream {}