Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Changed

- Requires `innmind/foundation:~1.5`

### Fixed

- PHP `8.4` deprecations
Expand Down
6 changes: 3 additions & 3 deletions benchmark/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
Factory,
Transport\Connection,
};
use Innmind\Socket\Internet\Transport;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\IO\Sockets\Internet\Transport;
use Innmind\TimeContinuum\Period;
use Innmind\Url\Url;
use Innmind\OperatingSystem\Factory as OSFactory;

Expand All @@ -18,6 +18,6 @@
->make(
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
new ElapsedPeriod(1000),
Period::second(1)->asElapsedPeriod(),
)
->listenSignals($os->process());
12 changes: 2 additions & 10 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,8 @@
},
"require": {
"php": "~8.2",
"innmind/immutable": "~5.7",
"innmind/time-continuum": "~3.1",
"innmind/math": "~6.0",
"innmind/url": "~4.1",
"ramsey/uuid": "~4.0",
"innmind/operating-system": "~5.0",
"innmind/media-type": "~2.0",
"innmind/filesystem": "~7.0",
"innmind/stream": "~4.0",
"innmind/io": "~2.6"
"innmind/foundation": "~1.5",
"ramsey/uuid": "~4.0"
},
"autoload": {
"psr-4": {
Expand Down
6 changes: 3 additions & 3 deletions fixtures/forever-consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
Command\Consume,
};
use Innmind\OperatingSystem\Factory as OSFactory;
use Innmind\Socket\Internet\Transport;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\IO\Sockets\Internet\Transport;
use Innmind\TimeContinuum\Period;
use Innmind\Url\Url;

$os = OSFactory::build();
$success = Factory::of($os)
->make(
Transport::tcp(),
Url::of('//guest:guest@localhost:5672/'),
new ElapsedPeriod(1000),
Period::second(1)->asElapsedPeriod(),
)
->listenSignals($os->process())
->with(DeclareQueue::of('always-empty'))
Expand Down
2 changes: 1 addition & 1 deletion src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace Innmind\AMQP;

use Innmind\OperatingSystem\OperatingSystem;
use Innmind\Socket\Internet\Transport as Socket;
use Innmind\IO\Sockets\Internet\Transport as Socket;
use Innmind\Url\Url;
use Innmind\TimeContinuum\ElapsedPeriod;

Expand Down
11 changes: 6 additions & 5 deletions src/TimeContinuum/Format/Timestamp.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
use Innmind\TimeContinuum\Format;

/**
* @psalm-immutable
* @internal
*/
final class Timestamp implements Format
final class Timestamp
{
#[\Override]
public function toString(): string
/**
* @psalm-pure
*/
public static function new(): Format
{
return 'U';
return Format::of('U');
}
}
34 changes: 10 additions & 24 deletions src/Transport/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
Exception\FrameExceedAllowedSize,
};
use Innmind\OperatingSystem\CurrentProcess\Signals;
use Innmind\Socket\{
Internet\Transport,
Client as Socket,
};
use Innmind\IO\{
Sockets\Client,
Readable\Frame as IOFrame,
Sockets\Clients\Client,
Sockets\Internet\Transport,
Frame as IOFrame,
};
use Innmind\Url\Url;
use Innmind\TimeContinuum\{
Expand All @@ -52,17 +49,16 @@
final class Connection
{
private Protocol $protocol;
/** @var Client<Socket> */
private Client $socket;
/** @var IOFrame<Frame> */
private IOFrame $frame;
private MaxChannels $maxChannels;
private MaxFrameSize $maxFrameSize;
private Heartbeat $heartbeat;
private SignalListener $signals;
private bool $closed = false;

/**
* @param Client<Socket> $socket
* @param IOFrame<Frame> $frame
*/
private function __construct(
Expand Down Expand Up @@ -94,20 +90,19 @@ public static function open(
Clock $clock,
Remote $remote,
): Maybe {
/** @psalm-suppress InvalidArgument */
return $remote
->socket(
$transport,
$server->authority()->withoutUserInformation(),
)
->map(
static fn($socket) => $socket
->timeoutAfter($timeout)
->timeoutAfter($timeout->asPeriod())
->toEncoding(Str\Encoding::ascii),
)
->flatMap(
static fn($socket) => $socket
->send(Sequence::of($protocol->version()->pack()))
->sink(Sequence::of($protocol->version()->pack()))
->map(static fn() => $socket),
)
->map(static fn($socket) => new self(
Expand All @@ -119,6 +114,7 @@ public static function open(
(new FrameReader)($protocol),
SignalListener::uninstalled(),
))
->maybe()
->flatMap(new Start($server->authority()))
->flatMap(new Handshake($server->authority()))
->flatMap(new OpenVHost($server->path()));
Expand Down Expand Up @@ -199,18 +195,13 @@ public function close(): Maybe
{
$this->signals->uninstall();

if ($this->closed()) {
/** @var Maybe<SideEffect> */
return Maybe::nothing();
}

return $this
->request(
static fn($protocol) => $protocol->connection()->close(Close::demand()),
Method::connectionCloseOk,
)
->flatMap(fn() => $this->socket->unwrap()->close())
->maybe()
->flatMap(fn() => $this->socket->close()->maybe())
->map(static fn() => new SideEffect);
}

Expand All @@ -234,7 +225,7 @@ public function tune(
->map(fn() => new self(
$this->protocol,
$this->heartbeat->adjust($heartbeat),
$this->socket->timeoutAfter($heartbeat),
$this->socket->timeoutAfter($heartbeat->asPeriod()),
$maxChannels,
$maxFrameSize,
$this->frame,
Expand Down Expand Up @@ -286,7 +277,7 @@ private function sendFrames(callable $frames): Either
return $this
->socket
->abortWhen($this->signals->notified(...))
->send($data)
->sink($data)
->either()
->eitherWay(
static fn() => Either::right(new SideEffect),
Expand Down Expand Up @@ -371,9 +362,4 @@ private function ensureValidFrame(
/** @var Either<Failure, ReceivedFrame> */
return Either::left(Failure::unexpectedFrame());
}

private function closed(): bool
{
return $this->socket->unwrap()->closed();
}
}
11 changes: 6 additions & 5 deletions src/Transport/Connection/FrameReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Frame\Value\UnsignedShortInteger,
Frame\Value\UnsignedLongInteger,
};
use Innmind\IO\Readable\Frame as IOFrame;
use Innmind\IO\Frame as IOFrame;
use Innmind\Immutable\Str;

/**
Expand Down Expand Up @@ -53,7 +53,7 @@ private function readFrame(
Type::method => $this->readMethod($protocol, $channel),
Type::header => $this->readHeader($protocol, $channel),
Type::body => $this->readBody($channel, $length),
Type::heartbeat => IOFrame\NoOp::of(Frame::heartbeat()),
Type::heartbeat => IOFrame::just(Frame::heartbeat()),
})
->flatMap(
static fn($frame) => UnsignedOctet::frame()
Expand Down Expand Up @@ -119,7 +119,8 @@ private function readHeader(
->map(static fn($value) => $value->unwrap()->original())
->flatMap(MethodClass::frame(...))
->flatMap(
static fn($class) => IOFrame\Chunk::of(2) // walk over the weight definition
static fn($class) => IOFrame::chunk(2) // walk over the weight definition
->strict()
->map(static fn() => $class),
)
->flatMap(
Expand All @@ -143,8 +144,8 @@ private function readBody(
int $length,
): IOFrame {
return (match ($length) {
0 => IOFrame\NoOp::of(Str::of('')),
default => IOFrame\Chunk::of($length),
0 => IOFrame::just(Str::of('')),
default => IOFrame::chunk($length)->strict(),
})
->map(static fn($data) => Frame::body($channel, $data));
}
Expand Down
5 changes: 3 additions & 2 deletions src/Transport/Connection/Handshake.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
Model\Connection\MaxFrameSize,
Failure,
};
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\TimeContinuum\Period;
use Innmind\Url\Authority;
use Innmind\Immutable\{
Maybe,
Expand Down Expand Up @@ -87,7 +87,8 @@ private function maybeTune(Connection $connection, Frame $frame): Either
->get(2)
->keep(Instance::of(Value\UnsignedShortInteger::class))
->map(static fn($value) => $value->original())
->map(ElapsedPeriod::of(...));
->map(Period::millisecond(...))
->map(static fn($period) => $period->asElapsedPeriod());

return Maybe::all($maxChannels, $maxFrameSize, $heartbeat)
->flatMap($connection->tune(...))
Expand Down
31 changes: 18 additions & 13 deletions src/Transport/Connection/MessageReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
Failure,
};
use Innmind\OperatingSystem\Filesystem;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\TimeContinuum\Period;
use Innmind\Filesystem\File\Content;
use Innmind\Stream\Stream\Size\Unit;
use Innmind\IO\Stream\Size\Unit;
use Innmind\Validation\Is;
use Innmind\Immutable\{
Str,
Predicate\Instance,
Expand Down Expand Up @@ -173,8 +174,14 @@ private function addProperties(
static fn(Maybe $value, Message $message) => $value
->keep(Instance::of(Value\ShortString::class))
->map(static fn($value) => (int) $value->original()->toString())
->flatMap(ElapsedPeriod::maybe(...))
->map(static fn($expiration) => $message->withExpiration($expiration)),
->keep(
Is::int()
->positive()
->or(Is::value(0))
->asPredicate(),
)
->map(Period::millisecond(...))
->map(static fn($expiration) => $message->withExpiration($expiration->asElapsedPeriod())),
],
[
7,
Expand Down Expand Up @@ -254,7 +261,8 @@ private function readMessage(
->wait()
->maybe()
->flatMap(static fn($received) => $received->frame()->content())
->map(static fn($chunk) => $chunk->toEncoding(Str\Encoding::ascii));
->map(static fn($chunk) => $chunk->toEncoding(Str\Encoding::ascii))
->attempt(static fn() => new \RuntimeException('Failed to read chunk'));
$read += $chunk->match(
static fn($chunk) => $chunk->length(),
static fn() => 0,
Expand All @@ -268,20 +276,17 @@ private function readMessage(
}
});

/** @psalm-suppress MixedArgumentTypeCoercion Because of the reduce it doesn't understand the type of the Sequence */
/** @var Sequence<Str> */
$unfolded = Sequence::of();
$content = match (true) {
$bodySize <= Unit::megabytes->times(2) => $chunks
->reduce(
Maybe::just(Sequence::of()),
static fn(Maybe $content, $chunk) => Maybe::all($content, $chunk)->map(
static fn(Sequence $chunks, Str $chunk) => ($chunks)($chunk),
),
)
->sink($unfolded)
->attempt(static fn($chunks, $chunk) => $chunk->map($chunks))
->map(Content::ofChunks(...)),
default => $this
->filesystem
->temporary($chunks)
->memoize(), // to prevent using a deferred Maybe that would result in out of order reading the socket
->memoize(), // to prevent using a deferred Attempt that would result in out of order reading the socket
};

return $content
Expand Down
Loading