diff --git a/src/Core/TransporterFactory.php b/src/Core/TransporterFactory.php index fb21a29..98a14e5 100644 --- a/src/Core/TransporterFactory.php +++ b/src/Core/TransporterFactory.php @@ -7,6 +7,7 @@ use GuzzleHttp\Exception\GuzzleException; use Redberry\MCPClient\Core\Transporters\HttpTransporter; use Redberry\MCPClient\Core\Transporters\StdioTransporter; +use Redberry\MCPClient\Core\Transporters\StreamableHttpTransporter; use Redberry\MCPClient\Core\Transporters\Transporter as ITransporter; use Redberry\MCPClient\Enums\Transporters; @@ -29,6 +30,7 @@ public static function make( return match ($type) { 'http' => new HttpTransporter($config), + 'streamable_http' => new StreamableHttpTransporter($config), 'stdio' => new StdioTransporter($config), default => throw new \InvalidArgumentException("Unsupported transporter type: {$type}"), }; diff --git a/src/Core/Transporters/StreamableHttpTransporter.php b/src/Core/Transporters/StreamableHttpTransporter.php new file mode 100644 index 0000000..b544176 --- /dev/null +++ b/src/Core/Transporters/StreamableHttpTransporter.php @@ -0,0 +1,275 @@ +initializeClient(); + } + + /** + * Perform the “initialize” handshake and capture the MCP session ID. + * Call this *once* before you start sending other RPCs. + * + * @throws GuzzleException + * @throws RandomException + */ + private function initializeSession(): void + { + if ($this->initialized) { + return; + } + + $payload = $this->preparePayload('initialize'); + + $response = $this->client->request('POST', '', [ + 'json' => $payload, + 'timeout' => $this->config['timeout'] ?? 30, + 'stream' => false, + ]); + + $hdr = $response->getHeader('mcp-session-id'); + if (! empty($hdr)) { + $this->sessionId = $hdr[0]; + } + + $this->initialized = true; + } + + /** + * @throws TransporterRequestException + * @throws GuzzleException + * @throws JsonException + * @throws RandomException + */ + public function request(string $action, ?array $params = null): array + { + $this->initializeSession(); + $payload = $this->preparePayload($action, $params); + + try { + $response = $this->client->request('POST', '', [ + 'json' => $payload, + 'timeout' => $this->config['timeout'] ?? 30, + 'headers' => [ + 'mcp-session-id' => $this->sessionId, + 'Accept' => 'application/json, text/event-stream', + ], + 'stream' => true, + ]); + + return $this->parseResponse($response); + } catch (GuzzleException $e) { + throw new TransporterRequestException( + "HTTP error for $action: {$e->getMessage()}", + (int) $e->getCode(), + $e + ); + } + } + + /** + * @throws TransporterRequestException + * @throws JsonException + */ + private function parseResponse(ResponseInterface $response): array + { + $contentType = strtolower(trim(explode(';', $response->getHeaderLine('Content-Type'))[0])); + + if ($contentType === 'text/event-stream') { + return $this->parseSseStream($response); + } + + $body = (string) $response->getBody(); + $data = json_decode($body, true, 512, JSON_THROW_ON_ERROR); + + if (json_last_error() !== JSON_ERROR_NONE) { + throw new TransporterRequestException('Invalid JSON response: '.json_last_error_msg()); + } + + if (isset($data['error'])) { + throw new TransporterRequestException( + "JSON-RPC error: {$data['error']['message']}", + $data['error']['code'] ?? 0 + ); + } + + return $data['result'] ?? $data; + } + + /** + * @throws TransporterRequestException + * @throws JsonException + */ + private function parseSseStream(ResponseInterface $response): array + { + $stream = $response->getBody(); + + $buffer = ''; + $currentEvent = [ + 'event' => null, + 'data' => '', + ]; + + $final = null; + + while (! $stream->eof()) { + $chunk = $stream->read($this->config['stream_read_bytes'] ?? 8192); + if ($chunk === '') { + continue; + } + + $buffer .= $chunk; + + while (($pos = strpos($buffer, "\n")) !== false) { + $line = substr($buffer, 0, $pos); + $buffer = substr($buffer, $pos + 1); + + $line = rtrim($line, "\r"); + + if ($line === '') { + $maybe = $this->finishSseEvent($currentEvent); + if ($maybe !== null) { + $final = $maybe; + } + $currentEvent = ['event' => null, 'data' => '']; + + continue; + } + + if (str_starts_with($line, ':')) { + continue; + } + + if (str_starts_with($line, 'event:')) { + $currentEvent['event'] = trim(substr($line, strlen('event:'))); + + continue; + } + + if (str_starts_with($line, 'data:')) { + $piece = substr($line, strlen('data:')); + $piece = ltrim($piece, ' '); + + $currentEvent['data'] .= ($currentEvent['data'] === '' ? '' : "\n").$piece; + } + } + } + + $maybe = $this->finishSseEvent($currentEvent); + if ($maybe !== null) { + $final = $maybe; + } + + if ($final === null) { + throw new TransporterRequestException('Stream ended without a JSON-RPC result.'); + } + + return $final; + } + + /** + * @throws TransporterRequestException + * @throws JsonException + */ + private function finishSseEvent(array $evt): ?array + { + $dataStr = trim($evt['data'] ?? ''); + if ($dataStr === '' || $dataStr === '[DONE]') { + return null; + } + + $decoded = json_decode($dataStr, true, 512, JSON_THROW_ON_ERROR); + + if (isset($decoded['error'])) { + throw new TransporterRequestException( + "JSON-RPC error: {$decoded['error']['message']}", + $decoded['error']['code'] ?? 0 + ); + } + + if (array_key_exists('result', $decoded)) { + return $decoded['result'] ?? $decoded; + } + + return $decoded; + } + + /** + * @throws RandomException + */ + private function generateId(): string|int + { + $id = random_int(1, 1000000); + $idType = $this->config['id_type'] ?? 'int'; + + return $idType === 'integer' || $idType === 'int' ? $id : (string) $id; + } + + /** + * @throws RandomException + */ + private function preparePayload(string $action, ?array $params = null): array + { + return [ + 'jsonrpc' => '2.0', + 'method' => $action, + 'params' => $params ?? (object) [], + 'id' => $this->generateId(), + ]; + } + + private function getClientBaseConfig(): array + { + $baseUri = $this->config['base_url'] ?? 'http://localhost/api'; + $token = $this->config['token'] ?? null; + + $headers = [ + 'Accept' => 'application/json', + 'Content-Type' => 'application/json', + ]; + + if ($token) { + $headers['Authorization'] = 'Bearer '.$token; + } + + if (isset($this->config['headers']) && is_array($this->config['headers'])) { + $headers = array_merge($headers, $this->config['headers']); + } + + return [ + 'base_uri' => $baseUri, + 'headers' => $headers, + ]; + } + + /** + * @throws GuzzleException + */ + private function initializeClient(): void + { + $clientConfig = $this->getClientBaseConfig(); + $this->client = new Client($clientConfig); + } +} diff --git a/src/Enums/Transporters.php b/src/Enums/Transporters.php index b920855..1de9a9f 100644 --- a/src/Enums/Transporters.php +++ b/src/Enums/Transporters.php @@ -7,5 +7,6 @@ enum Transporters: string { case HTTP = 'http'; + case STREAMABLE_HTTP = 'streamable_http'; case STDIO = 'stdio'; } diff --git a/tests/MCPClient/MCPClientTest.php b/tests/MCPClient/MCPClientTest.php index a4b6ca4..95005bd 100644 --- a/tests/MCPClient/MCPClientTest.php +++ b/tests/MCPClient/MCPClientTest.php @@ -21,6 +21,12 @@ 'timeout' => 30, 'token' => 'token_value', ], + 'streamable' => [ + 'type' => Transporters::STREAMABLE_HTTP, + 'base_url' => 'https://example.com/mcp', + 'timeout' => 30, + 'token' => 'token_value', + ], 'npx_mcp_server' => [ 'type' => Transporters::STDIO, 'command' => [ @@ -104,6 +110,60 @@ ->toHaveCount(2); }); + test('connect works with STDIO server type', function () { + $mockFactory = Mockery::mock(TransporterFactory::class); + $mockTransporter = Mockery::mock(Transporter::class); + + $mockFactory->shouldReceive('make') + ->once() + ->with(config('mcp-client.servers.npx_mcp_server')) + ->andReturn($mockTransporter); + + $client = new MCPClient(config('mcp-client.servers'), $mockFactory); + $connected = $client->connect('npx_mcp_server'); + + expect($connected)->toBeInstanceOf(MCPClient::class); + }); + + test('callTool delegates to transporter with name and arguments as object', function () { + $mockTransporter = Mockery::mock(Transporter::class); + $mockFactory = Mockery::mock(TransporterFactory::class); + + $mockTransporter->shouldReceive('request') + ->once() + ->with('tools/call', Mockery::on(function ($params) { + // arguments should be cast to object + return isset($params['arguments']->x, $params['name']) && $params['name'] === 'doSomething' && is_object($params['arguments']) && $params['arguments']->x === 1; + })) + ->andReturn(['ok' => true]); + + $mockFactory->shouldReceive('make')->andReturn($mockTransporter); + + $client = new MCPClient(config('mcp-client.servers'), $mockFactory); + $client->connect('using_enum'); + $result = $client->callTool('doSomething', ['x' => 1]); + + expect($result)->toEqual(['ok' => true]); + }); + + test('readResource delegates to transporter with uri and returns response', function () { + $mockTransporter = Mockery::mock(Transporter::class); + $mockFactory = Mockery::mock(TransporterFactory::class); + + $mockTransporter->shouldReceive('request') + ->once() + ->with('resources/read', ['uri' => 'file:///tmp/readme.txt']) + ->andReturn(['content' => 'hello']); + + $mockFactory->shouldReceive('make')->andReturn($mockTransporter); + + $client = new MCPClient(config('mcp-client.servers'), $mockFactory); + $client->connect('using_enum'); + $result = $client->readResource('file:///tmp/readme.txt'); + + expect($result)->toEqual(['content' => 'hello']); + }); + test('tools throws exception when not connected', function () { $client = new MCPClient(config('mcp-client.servers')); diff --git a/tests/Transporter/StreamableHttpTransporterTest.php b/tests/Transporter/StreamableHttpTransporterTest.php new file mode 100644 index 0000000..8c94e9a --- /dev/null +++ b/tests/Transporter/StreamableHttpTransporterTest.php @@ -0,0 +1,165 @@ +getProperty('client'); + $prop->setAccessible(true); + $prop->setValue($transporter, $mockClient); + + $sessionProp = $ref->getProperty('sessionId'); + $sessionProp->setAccessible(true); + $sessionProp->setValue($transporter, 'test-session-id'); + + $initializedProp = $ref->getProperty('initialized'); + $initializedProp->setAccessible(true); + $initializedProp->setValue($transporter, true); + + return [$transporter, $mockClient]; + } + + test('preparePayload builds correct payload', function () { + $t = new StreamableHttpTransporter; + $m = new ReflectionMethod(StreamableHttpTransporter::class, 'preparePayload'); + $m->setAccessible(true); + + $payload = $m->invoke($t, 'testMethod', ['x' => 1]); + + expect($payload['jsonrpc'])->toBe('2.0') + ->and($payload['method'])->toBe('testMethod') + ->and($payload['params'])->toEqual(['x' => 1]) + ->and(is_int($payload['id']) || is_string($payload['id']))->toBeTrue(); + }); + + test('generateId returns int by default and string when configured', function () { + $t1 = new StreamableHttpTransporter; + $g = new ReflectionMethod(StreamableHttpTransporter::class, 'generateId'); + $g->setAccessible(true); + $id1 = $g->invoke($t1); + expect(is_int($id1))->toBeTrue(); + + $t2 = new StreamableHttpTransporter(['id_type' => 'string']); + $g2 = new ReflectionMethod(StreamableHttpTransporter::class, 'generateId'); + $g2->setAccessible(true); + $id2 = $g2->invoke($t2); + expect(is_string($id2))->toBeTrue(); + }); + + test('getClientBaseConfig defaults and overrides', function () { + $t = new StreamableHttpTransporter([ + 'base_url' => 'https://example.com/api', + 'token' => 'secret', + 'headers' => [ + 'X-Custom' => 'v', + 'Accept' => 'application/vnd.api+json', + ], + ]); + $m = new ReflectionMethod(StreamableHttpTransporter::class, 'getClientBaseConfig'); + $m->setAccessible(true); + $cfg = $m->invoke($t); + + expect($cfg['base_uri'])->toBe('https://example.com/api') + ->and($cfg['headers']['Authorization'])->toBe('Bearer secret') + ->and($cfg['headers']['X-Custom'])->toBe('v') + ->and($cfg['headers']['Accept'])->toBe('application/vnd.api+json') + ->and($cfg['headers']['Content-Type'])->toBe('application/json'); + }); + + test('successful JSON response returns result', function () { + [$t, $mock] = createStreamableWithMockedSession(); + + $resp = new Response(200, ['Content-Type' => 'application/json'], json_encode(['result' => ['foo' => 'bar']])); + + $mock->shouldReceive('request') + ->once() + ->with('POST', '', Mockery::on(static function ($options) { + return ($options['headers']['mcp-session-id'] ?? null) === 'test-session-id' + && ($options['headers']['Accept'] ?? '') === 'application/json, text/event-stream' + && ($options['stream'] ?? null) === true + && ($options['timeout'] ?? null) === 30 + && ($options['json']['method'] ?? null) === 'act'; + })) + ->andReturn($resp); + + $result = $t->request('act', ['a' => 1]); + expect($result)->toEqual(['foo' => 'bar']); + }); + + test('initializeSession captures mcp-session-id from first response', function () { + $t = new StreamableHttpTransporter; + $mock = Mockery::mock(Client::class); + + $ref = new ReflectionClass($t); + $prop = $ref->getProperty('client'); + $prop->setAccessible(true); + $prop->setValue($t, $mock); + + // First call: initialize + $initResp = new Response(200, ['mcp-session-id' => 'abc-123', 'Content-Type' => 'application/json'], json_encode(['ok' => true])); + // Second call: actual request + $reqResp = new Response(200, ['Content-Type' => 'application/json'], json_encode(['result' => ['ok' => 1]])); + + $mock->shouldReceive('request')->once()->with('POST', '', Mockery::type('array'))->andReturn($initResp); + $mock->shouldReceive('request') + ->once() + ->with('POST', '', Mockery::on(static function ($options) { + return ($options['headers']['mcp-session-id'] ?? '') === 'abc-123'; + })) + ->andReturn($reqResp); + + $result = $t->request('ping'); + expect($result)->toEqual(['ok' => 1]); + }); + + test('parses SSE stream and returns last non-null result', function () { + [$t, $mock] = createStreamableWithMockedSession(); + + $sse = <<<'SSE' +event: jsonrpc.message +data: {"jsonrpc":"2.0","id":1,"result":{"delta":"Hello"}} + +event: jsonrpc.message +data: {"jsonrpc":"2.0","id":1,"result":{"final":"World"}} + +data: [DONE] +SSE; + $body = Utils::streamFor($sse); + $resp = new Response(200, ['Content-Type' => 'text/event-stream'], $body); + + $mock->shouldReceive('request') + ->once() + ->with('POST', '', Mockery::type('array')) + ->andReturn($resp); + + $result = $t->request('stream', []); + expect($result)->toEqual(['final' => 'World']); + }); + + test('wraps Guzzle exceptions as TransporterRequestException', function () { + [$t, $mock] = createStreamableWithMockedSession(); + + $mock->shouldReceive('request') + ->once() + ->andThrow(new TransferException('boom')); + + $this->expectException(TransporterRequestException::class); + $t->request('fail'); + }); +});