Skip to content
This repository was archived by the owner on Nov 9, 2017. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/DNode/Connection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php
namespace DNode;
use React\EventLoop\LoopInterface;
use React\Socket\Connection as BaseConnection;

class Connection extends BaseConnection
{
private $lastBufferSize;

public function handleData($stream)
{
if ($this->bufferSize != $this->lastBufferSize) {
$this->adjustReadBufferSize($stream);
}

$data = fread($stream, $this->bufferSize);

if ('' === $data || false === $data) {
$this->end();
} else {
$this->emit('data', array($data, $this));
}
}

protected function adjustReadBufferSize($stream)
{
$this->lastBufferSize = $this->bufferSize;
if (0 !== stream_set_read_buffer($stream, 0)) {
throw new \RuntimeException('Unable to set read buffer on stream');
}
}
}
21 changes: 17 additions & 4 deletions src/DNode/DNode.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use React\Socket\Server;
use React\Socket\Connection;
use React\Socket\ConnectionInterface;

class DNode extends EventEmitter
Expand All @@ -30,17 +29,31 @@ public function using($middleware)
public function connect()
{
$params = $this->protocol->parseArgs(func_get_args());

if (!isset($params['scheme'])) {
$params['scheme'] = 'tcp';
}

if (!isset($params['host'])) {
$params['host'] = '127.0.0.1';
}

if (!isset($params['port'])) {
throw new \Exception("For now we only support TCP connections to a defined port");
throw new \Exception("For now we only support connections to a defined port");
}

if (!in_array($params['scheme'], stream_get_transports())) {
$e = new \InvalidArgumentException("Scheme {$params['scheme']} is not supported... are you missing an extension?");

$this->emit('error', array($e));
}

$client = @stream_socket_client("tcp://{$params['host']}:{$params['port']}");
$url = "{$params['scheme']}://{$params['host']}:{$params['port']}";
$client = @stream_socket_client($url);

if (!$client) {
$e = new \RuntimeException("No connection to DNode server in tcp://{$params['host']}:{$params['port']}");
$e = new \RuntimeException("No connection to DNode server in $url");

$this->emit('error', array($e));

if (!count($this->listeners('error'))) {
Expand Down
8 changes: 5 additions & 3 deletions src/DNode/Protocol.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ public function parseArgs($args) {
continue;
}

foreach ($arg as $key => $value) {
$params[$key] = $value;
}
$arg = (array) $arg;
}

if (is_array($arg)) {
$params = array_merge($params, $arg);
continue;
}

Expand Down
13 changes: 11 additions & 2 deletions tests/DNode/ProtocolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public function provideParseArgs()
$obj->foo = 'bar';
$obj->baz = 'qux';

$arr = array(
'quux' => 'corge',
'grault' => 'garply'
);

return array(
'string number becomes port' => array(
array('port' => '8080'),
Expand All @@ -83,6 +88,10 @@ public function provideParseArgs()
array('port' => 8080),
array(8080),
),
'array becomes merged' => array(
array('quux' => 'corge', 'grault' => 'garply'),
array($arr),
),
'Closure becomes block' => array(
array('block' => $closure),
array($closure),
Expand All @@ -102,11 +111,11 @@ public function provideParseArgs()
* @test
* @covers DNode\Protocol::parseArgs
* @expectedException InvalidArgumentException
* @expectedExceptionMessage Not sure what to do about array arguments
* @expectedExceptionMessage Not sure what to do about boolean arguments
*/
public function parseArgsShouldRejectInvalidArgs()
{
$args = array(array('wat'));
$args = array(true);
$this->protocol->parseArgs($args);
}
}