mirror of
https://github.com/danog/parallel.git
synced 2024-12-02 09:37:57 +01:00
parent
15ead6c976
commit
6e310d0219
@ -6,6 +6,7 @@ use Amp\Failure;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use Amp\Sync\PosixSemaphore;
|
||||
use Amp\Sync\SyncException;
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
@ -50,7 +51,7 @@ final class SharedMemoryParcel implements Parcel
|
||||
/** @var PosixSemaphore A semaphore for synchronizing on the parcel. */
|
||||
private $semaphore;
|
||||
|
||||
/** @var int An open handle to the shared memory segment. */
|
||||
/** @var resource|null An open handle to the shared memory segment. */
|
||||
private $handle;
|
||||
|
||||
/** @var int */
|
||||
@ -64,6 +65,10 @@ final class SharedMemoryParcel implements Parcel
|
||||
* @param int $permissions Permissions to access the semaphore. Use file permission format specified as 0xxx.
|
||||
*
|
||||
* @return self
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
* @throws SyncException
|
||||
* @throws \Error If the size or permissions are invalid.
|
||||
*/
|
||||
public static function create(string $id, $value, int $size = 8192, int $permissions = 0600): self
|
||||
{
|
||||
@ -76,6 +81,8 @@ final class SharedMemoryParcel implements Parcel
|
||||
* @param string $id
|
||||
*
|
||||
* @return self
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
*/
|
||||
public static function use(string $id): self
|
||||
{
|
||||
@ -85,21 +92,12 @@ final class SharedMemoryParcel implements Parcel
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new local object container.
|
||||
*
|
||||
* The object given will be assigned a new object ID and will have a
|
||||
* reference to it stored in memory local to the thread.
|
||||
*
|
||||
* @param mixed $value The value to store in the container.
|
||||
* @param int $size The number of bytes to allocate for the object.
|
||||
* If not specified defaults to 16384 bytes.
|
||||
* @param int $permissions The access permissions to set for the object.
|
||||
* If not specified defaults to 0600.
|
||||
* @param string $id
|
||||
*/
|
||||
private function __construct(string $id)
|
||||
{
|
||||
if (!\extension_loaded("shmop")) {
|
||||
throw new \Error(__CLASS__ . " requires the shmop extension.");
|
||||
throw new \Error(__CLASS__ . " requires the shmop extension");
|
||||
}
|
||||
|
||||
$this->id = $id;
|
||||
@ -110,9 +108,21 @@ final class SharedMemoryParcel implements Parcel
|
||||
* @param mixed $value
|
||||
* @param int $size
|
||||
* @param int $permissions
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
* @throws SyncException
|
||||
* @throws \Error If the size or permissions are invalid.
|
||||
*/
|
||||
private function init($value, int $size = 8192, int $permissions = 0600): void
|
||||
{
|
||||
if ($size <= 0) {
|
||||
throw new \Error('The memory size must be greater than 0');
|
||||
}
|
||||
|
||||
if ($permissions <= 0 || $permissions > 0777) {
|
||||
throw new \Error('Invalid permissions');
|
||||
}
|
||||
|
||||
$this->semaphore = PosixSemaphore::create($this->id, 1);
|
||||
$this->initializer = \getmypid();
|
||||
|
||||
@ -153,20 +163,24 @@ final class SharedMemoryParcel implements Parcel
|
||||
*/
|
||||
public function unwrap(): Promise
|
||||
{
|
||||
try {
|
||||
if ($this->isFreed()) {
|
||||
return new Failure(new SharedMemoryException('The object has already been freed.'));
|
||||
throw new SharedMemoryException('The object has already been freed');
|
||||
}
|
||||
|
||||
$header = $this->getHeader();
|
||||
|
||||
// Make sure the header is in a valid state and format.
|
||||
if ($header['state'] !== self::STATE_ALLOCATED || $header['size'] <= 0) {
|
||||
new Failure(new SharedMemoryException('Shared object memory is corrupt.'));
|
||||
throw new SharedMemoryException('Shared object memory is corrupt');
|
||||
}
|
||||
|
||||
// Read the actual value data from memory and unserialize it.
|
||||
$data = $this->memGet(self::MEM_DATA_OFFSET, $header['size']);
|
||||
return new Success(\unserialize($data));
|
||||
} catch (\Exception $exception) {
|
||||
return new Failure($exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -180,7 +194,7 @@ final class SharedMemoryParcel implements Parcel
|
||||
private function wrap($value): void
|
||||
{
|
||||
if ($this->isFreed()) {
|
||||
throw new SharedMemoryException('The object has already been freed.');
|
||||
throw new SharedMemoryException('The object has already been freed');
|
||||
}
|
||||
|
||||
$serialized = \serialize($value);
|
||||
@ -278,6 +292,8 @@ final class SharedMemoryParcel implements Parcel
|
||||
/**
|
||||
* Updates the current memory segment handle, handling any moves made on the
|
||||
* data.
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
*/
|
||||
private function handleMovedMemory(): void
|
||||
{
|
||||
@ -302,6 +318,8 @@ final class SharedMemoryParcel implements Parcel
|
||||
* Reads and returns the data header at the current memory segment.
|
||||
*
|
||||
* @return array An associative array of header data.
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
*/
|
||||
private function getHeader(): array
|
||||
{
|
||||
@ -315,6 +333,8 @@ final class SharedMemoryParcel implements Parcel
|
||||
* @param int $state An object state.
|
||||
* @param int $size The size of the stored data, or other value.
|
||||
* @param int $permissions The permissions mask on the memory segment.
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
*/
|
||||
private function setHeader(int $state, int $size, int $permissions): void
|
||||
{
|
||||
@ -329,13 +349,16 @@ final class SharedMemoryParcel implements Parcel
|
||||
* @param string $mode The mode to open the shared memory in.
|
||||
* @param int $permissions Process permissions on the shared memory.
|
||||
* @param int $size The size to crate the shared memory in bytes.
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
*/
|
||||
private function memOpen(int $key, string $mode, int $permissions, int $size): void
|
||||
{
|
||||
$this->handle = @\shmop_open($key, $mode, $permissions, $size);
|
||||
if ($this->handle === false) {
|
||||
throw new SharedMemoryException('Failed to create shared memory block.');
|
||||
$handle = @\shmop_open($key, $mode, $permissions, $size);
|
||||
if ($handle === false) {
|
||||
throw new SharedMemoryException('Failed to create shared memory block');
|
||||
}
|
||||
$this->handle = $handle;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -345,12 +368,14 @@ final class SharedMemoryParcel implements Parcel
|
||||
* @param int $size The number of bytes to read.
|
||||
*
|
||||
* @return string The binary data at the given offset.
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
*/
|
||||
private function memGet(int $offset, int $size): string
|
||||
{
|
||||
$data = \shmop_read($this->handle, $offset, $size);
|
||||
if ($data === false) {
|
||||
throw new SharedMemoryException('Failed to read from shared memory block.');
|
||||
throw new SharedMemoryException('Failed to read from shared memory block');
|
||||
}
|
||||
return $data;
|
||||
}
|
||||
@ -360,21 +385,25 @@ final class SharedMemoryParcel implements Parcel
|
||||
*
|
||||
* @param int $offset The offset to write to.
|
||||
* @param string $data The binary data to write.
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
*/
|
||||
private function memSet(int $offset, string $data): void
|
||||
{
|
||||
if (!\shmop_write($this->handle, $data, $offset)) {
|
||||
throw new SharedMemoryException('Failed to write to shared memory block.');
|
||||
throw new SharedMemoryException('Failed to write to shared memory block');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests the shared memory segment to be deleted.
|
||||
*
|
||||
* @throws SharedMemoryException
|
||||
*/
|
||||
private function memDelete(): void
|
||||
{
|
||||
if (!\shmop_delete($this->handle)) {
|
||||
throw new SharedMemoryException('Failed to discard shared memory block.');
|
||||
throw new SharedMemoryException('Failed to discard shared memory block');
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ use Amp\Delayed;
|
||||
use Amp\Parallel\Context\Process;
|
||||
use Amp\Parallel\Sync\Parcel;
|
||||
use Amp\Parallel\Sync\SharedMemoryParcel;
|
||||
use Amp\Sync\SyncException;
|
||||
|
||||
/**
|
||||
* @requires extension shmop
|
||||
@ -28,7 +29,7 @@ class SharedMemoryParcelTest extends AbstractParcelTest
|
||||
$this->parcel = null;
|
||||
}
|
||||
|
||||
public function testObjectOverflowMoved()
|
||||
public function testObjectOverflowMoved(): \Generator
|
||||
{
|
||||
$object = SharedMemoryParcel::create(self::ID, 'hi', 2);
|
||||
yield $object->synchronized(function () {
|
||||
@ -42,7 +43,7 @@ class SharedMemoryParcelTest extends AbstractParcelTest
|
||||
* @group posix
|
||||
* @requires extension pcntl
|
||||
*/
|
||||
public function testSetInSeparateProcess()
|
||||
public function testSetInSeparateProcess(): \Generator
|
||||
{
|
||||
$object = SharedMemoryParcel::create(self::ID, 42);
|
||||
|
||||
@ -61,4 +62,29 @@ class SharedMemoryParcelTest extends AbstractParcelTest
|
||||
$this->assertSame(44, yield $process->join()); // Wait for child process to finish.
|
||||
$this->assertEquals(44, yield $object->unwrap());
|
||||
}
|
||||
|
||||
public function testInvalidSize(): void
|
||||
{
|
||||
$this->expectException(\Error::class);
|
||||
$this->expectExceptionMessage('size must be greater than 0');
|
||||
|
||||
SharedMemoryParcel::create(self::ID, 42, -1);
|
||||
}
|
||||
|
||||
public function testInvalidPermissions(): void
|
||||
{
|
||||
$this->expectException(\Error::class);
|
||||
$this->expectExceptionMessage('Invalid permissions');
|
||||
|
||||
SharedMemoryParcel::create(self::ID, 42, 8192, 0);
|
||||
}
|
||||
|
||||
|
||||
public function testNotFound(): void
|
||||
{
|
||||
$this->expectException(SyncException::class);
|
||||
$this->expectExceptionMessage('No semaphore with that ID found');
|
||||
|
||||
SharedMemoryParcel::use('invalid');
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user