Move async stuff to danog/php-tokio

This commit is contained in:
Daniil Gentili 2023-08-27 16:26:31 +02:00
parent 725dfec0f2
commit 253255b470
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
3 changed files with 0 additions and 184 deletions

View File

@ -1,52 +0,0 @@
use crate::binary_slice::{BinarySlice, PackSlice};
#[inline(always)]
pub unsafe fn borrow_unchecked<
'original,
'unbounded,
Ref: BorrowUnchecked<'original, 'unbounded>,
>(
reference: Ref,
) -> Ref::Unbounded {
unsafe { BorrowUnchecked::borrow_unchecked(reference) }
}
#[doc(hidden)]
pub unsafe trait BorrowUnchecked<'original, 'unbounded> {
type Unbounded;
unsafe fn borrow_unchecked(self) -> Self::Unbounded;
}
unsafe impl<'original, 'unbounded, T: 'unbounded> BorrowUnchecked<'original, 'unbounded>
for &'original T
{
type Unbounded = &'unbounded T;
#[inline(always)]
unsafe fn borrow_unchecked(self) -> Self::Unbounded {
unsafe { ::core::mem::transmute(self) }
}
}
unsafe impl<'original, 'unbounded, T: 'unbounded> BorrowUnchecked<'original, 'unbounded>
for &'original mut T
{
type Unbounded = &'unbounded mut T;
#[inline(always)]
unsafe fn borrow_unchecked(self) -> Self::Unbounded {
unsafe { ::core::mem::transmute(self) }
}
}
unsafe impl<'original, 'unbounded, T: 'unbounded + PackSlice> BorrowUnchecked<'original, 'unbounded>
for BinarySlice<'original, T>
{
type Unbounded = BinarySlice<'unbounded, T>;
#[inline(always)]
unsafe fn borrow_unchecked(self) -> Self::Unbounded {
unsafe { ::core::mem::transmute(self) }
}
}

View File

@ -1,126 +0,0 @@
use crate::boxed::ZBox;
use crate::prelude::PhpResult;
use crate::types::ZendHashTable;
use crate::zend::Function;
use std::cell::RefCell;
use std::fs::File;
use std::future::Future;
use std::io::{self, Write};
use std::os::fd::{RawFd, FromRawFd};
use std::sync::mpsc::{Sender, Receiver, channel};
use std::io::Read;
use lazy_static::lazy_static;
use tokio::runtime::Runtime;
use std::os::fd::AsRawFd;
use super::{borrow_unchecked};
lazy_static! {
pub static ref RUNTIME: Runtime = Runtime::new().expect("Could not allocate runtime");
}
thread_local! {
static EVENTLOOP: RefCell<Option<EventLoop>> = RefCell::new(None);
}
#[cfg(any(target_os = "linux", target_os = "solaris"))]
fn sys_pipe() -> io::Result<(RawFd, RawFd)> {
let mut pipefd = [0; 2];
let ret = unsafe { libc::pipe2(pipefd.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
if ret == -1 {
return Err(io::Error::last_os_error());
}
Ok((pipefd[0], pipefd[1]))
}
pub struct EventLoop {
fibers: ZBox<ZendHashTable>,
sender: Sender<u64>,
receiver: Receiver<u64>,
notify_sender: File,
notify_receiver: File,
get_current_suspension: Function,
dummy: [u8; 1],
}
impl EventLoop {
pub fn init() -> PhpResult<u64> {
EVENTLOOP.with_borrow_mut(|e| {
Ok(
match e {
None => e.insert(Self::new()?),
Some(ev) => ev
}.notify_receiver.as_raw_fd() as u64
)
})
}
pub fn suspend_on<T: Send + 'static, F: Future<Output = T> + Send + 'static>(future: F) -> T {
let (future, get_current_suspension) = EVENTLOOP.with_borrow_mut(move |c| {
let c = c.as_mut().unwrap();
let idx = c.fibers.len() as u64;
c.fibers.insert_at_index(idx, call_user_func!(c.get_current_suspension).unwrap()).unwrap();
let sender = c.sender.clone();
let mut notifier = c.notify_sender.try_clone().unwrap();
(RUNTIME.spawn(async move {
let res = future.await;
sender.send(idx).unwrap();
notifier.write_all(&[0]).unwrap();
res
}), unsafe {
borrow_unchecked(&c.get_current_suspension)
})
});
call_user_func!(get_current_suspension).unwrap().try_call_method("suspend", vec![]).unwrap();
return RUNTIME.block_on(future).unwrap();
}
pub fn wakeup() -> PhpResult<()> {
EVENTLOOP.with_borrow_mut(|c| {
let c = c.as_mut().unwrap();
c.notify_receiver.read_exact(&mut c.dummy).unwrap();
for fiber_id in c.receiver.try_iter() {
if let Some(fiber) = c.fibers.get_index_mut(fiber_id) {
fiber.object_mut().unwrap().try_call_method("resume", vec![])?;
c.fibers.remove_index(fiber_id);
}
}
Ok(())
})
}
pub fn shutdown() {
EVENTLOOP.set(None)
}
fn new() -> PhpResult<Self> {
let (sender, receiver) = channel();
let (notify_receiver, notify_sender) =
sys_pipe().map_err(|err| format!("Could not create pipe: {}", err))?;
call_user_func!(Function::from_function("class_exists"), "\\Revolt\\EventLoop")?;
call_user_func!(Function::from_function("interface_exists"), "\\Revolt\\EventLoop\\Suspension")?;
Ok(Self {
fibers: ZendHashTable::new(),
sender: sender,
receiver: receiver,
notify_sender: unsafe { File::from_raw_fd(notify_sender) },
notify_receiver: unsafe { File::from_raw_fd(notify_receiver) },
dummy: [0; 1],
get_current_suspension: Function::try_from_method("\\Revolt\\EventLoop", "getSuspension").ok_or("\\Revolt\\EventLoop::getSuspension does not exist")?,
})
}
}

View File

@ -8,8 +8,6 @@ mod function;
mod globals;
mod handlers;
mod module;
mod fibers;
mod borrow_unchecked;
use crate::{error::Result, ffi::php_printf};
use std::ffi::CString;
@ -20,10 +18,6 @@ pub use ex::ExecuteData;
pub use function::FunctionEntry;
pub use function::Function;
pub use globals::ExecutorGlobals;
pub use fibers::RUNTIME;
pub use fibers::EventLoop;
pub use borrow_unchecked::borrow_unchecked;
pub use borrow_unchecked::BorrowUnchecked;
pub use handlers::ZendObjectHandlers;
pub use module::ModuleEntry;