Switch to a Mutex-based approach in Streamlike.

This commit is contained in:
2021-07-25 17:17:38 -07:00
parent ca2eddf515
commit 1436b02323

View File

@@ -2,13 +2,23 @@ use async_std::task::{Context, Poll};
use futures::io; use futures::io;
use futures::io::{AsyncRead, AsyncWrite}; use futures::io::{AsyncRead, AsyncWrite};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::{Arc, Mutex};
pub trait Streamlike: AsyncRead + AsyncWrite + Send + Sync + Unpin {} pub trait Streamlike: AsyncRead + AsyncWrite + Send + Sync + Unpin {}
#[derive(Clone)] #[derive(Clone)]
pub struct GenericStream { pub struct GenericStream {
internal: Arc<Box<dyn Streamlike>>, internal: Arc<Mutex<dyn Streamlike>>,
}
impl GenericStream {
pub fn new<T: Streamlike + 'static>(x: T) -> GenericStream {
GenericStream{
internal: Arc::new(Mutex::new(x))
}
}
}
} }
impl AsyncRead for GenericStream { impl AsyncRead for GenericStream {
@@ -17,8 +27,9 @@ impl AsyncRead for GenericStream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
let base = Pin::into_inner(self); let mut item = self.internal.lock().unwrap();
Pin::new(base).poll_read(cx, buf) let pinned = Pin::new(&mut *item);
pinned.poll_read(cx, buf)
} }
} }
@@ -28,25 +39,28 @@ impl AsyncWrite for GenericStream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
let base = Pin::into_inner(self); let mut item = self.internal.lock().unwrap();
Pin::new(base).poll_write(cx, buf) let pinned = Pin::new(&mut *item);
pinned.poll_write(cx, buf)
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let base = Pin::into_inner(self); let mut item = self.internal.lock().unwrap();
Pin::new(base).poll_flush(cx) let pinned = Pin::new(&mut *item);
pinned.poll_flush(cx)
} }
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let base = Pin::into_inner(self); let mut item = self.internal.lock().unwrap();
Pin::new(base).poll_close(cx) let pinned = Pin::new(&mut *item);
pinned.poll_close(cx)
} }
} }
impl<T: Streamlike + 'static> From<T> for GenericStream { impl<T: Streamlike + 'static> From<T> for GenericStream {
fn from(x: T) -> GenericStream { fn from(x: T) -> GenericStream {
GenericStream { GenericStream {
internal: Arc::new(Box::new(x)), internal: Arc::new(Mutex::new(x)),
} }
} }
} }