Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 85c522f

Browse files
committed
Implement presence channel storage in Redis
1 parent 167d518 commit 85c522f

File tree

8 files changed

+285
-30
lines changed

8 files changed

+285
-30
lines changed

src/HttpApi/Controllers/Controller.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Illuminate\Http\JsonResponse;
1212
use GuzzleHttp\Psr7\ServerRequest;
1313
use Illuminate\Support\Collection;
14+
use React\Promise\PromiseInterface;
1415
use Ratchet\Http\HttpServerInterface;
1516
use Psr\Http\Message\RequestInterface;
1617
use BeyondCode\LaravelWebSockets\Apps\App;
@@ -30,7 +31,7 @@ abstract class Controller implements HttpServerInterface
3031
/** @var int */
3132
protected $contentLength;
3233

33-
/** @var \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager */
34+
/** @var ChannelManager */
3435
protected $channelManager;
3536

3637
public function __construct(ChannelManager $channelManager)
@@ -92,8 +93,23 @@ protected function handleRequest(ConnectionInterface $connection)
9293
->ensureValidAppId($laravelRequest->appId)
9394
->ensureValidSignature($laravelRequest);
9495

96+
// Invoke the controller action
9597
$response = $this($laravelRequest);
9698

99+
// Allow for async IO in the controller action
100+
if ($response instanceof PromiseInterface) {
101+
$response->then(function($response) use ($connection) {
102+
$this->sendAndClose($connection, $response);
103+
});
104+
105+
return;
106+
}
107+
108+
$this->sendAndClose($connection, $response);
109+
}
110+
111+
protected function sendAndClose(ConnectionInterface $connection, $response)
112+
{
97113
$connection->send(JsonResponse::create($response));
98114
$connection->close();
99115
}

src/HttpApi/Controllers/FetchChannelsController.php

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use Illuminate\Support\Str;
66
use Illuminate\Http\Request;
77
use Illuminate\Support\Collection;
8+
use React\Promise\PromiseInterface;
9+
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
810
use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel;
911

1012
class FetchChannelsController extends Controller
@@ -21,10 +23,39 @@ public function __invoke(Request $request)
2123
});
2224
}
2325

26+
if (config('websockets.replication.enabled') === true) {
27+
// We want to get the channel user count all in one shot when
28+
// using a replication backend rather than doing individual queries.
29+
// To do so, we first collect the list of channel names.
30+
$channelNames = $channels->map(function (PresenceChannel $channel) use ($request) {
31+
return $channel->getChannelName();
32+
})->toArray();
33+
34+
/** @var PromiseInterface $memberCounts */
35+
// We ask the replication backend to get us the member count per channel
36+
$memberCounts = app(ReplicationInterface::class)
37+
->channelMemberCounts($request->appId, $channelNames);
38+
39+
// We return a promise since the backend runs async. We get $counts back
40+
// as a key-value array of channel names and their member count.
41+
return $memberCounts->then(function (array $counts) use ($channels) {
42+
return $this->collectUserCounts($channels, function(PresenceChannel $channel) use ($counts) {
43+
return $counts[$channel->getChannelName()];
44+
});
45+
});
46+
}
47+
48+
return $this->collectUserCounts($channels, function(PresenceChannel $channel) {
49+
return $channel->getUserCount();
50+
});
51+
}
52+
53+
protected function collectUserCounts(Collection $channels, callable $transformer)
54+
{
2455
return [
25-
'channels' => $channels->map(function ($channel) {
56+
'channels' => $channels->map(function (PresenceChannel $channel) use ($transformer) {
2657
return [
27-
'user_count' => count($channel->getUsers()),
58+
'user_count' => $transformer($channel),
2859
];
2960
})->toArray() ?: new \stdClass,
3061
];

src/HttpApi/Controllers/FetchUsersController.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Illuminate\Http\Request;
66
use Illuminate\Support\Collection;
7+
use React\Promise\PromiseInterface;
78
use Symfony\Component\HttpKernel\Exception\HttpException;
89
use BeyondCode\LaravelWebSockets\WebSockets\Channels\PresenceChannel;
910

@@ -21,8 +22,21 @@ public function __invoke(Request $request)
2122
throw new HttpException(400, 'Invalid presence channel "'.$request->channelName.'"');
2223
}
2324

25+
$users = $channel->getUsers($request->appId);
26+
27+
if ($users instanceof PromiseInterface) {
28+
return $users->then(function(array $users) {
29+
return $this->collectUsers($users);
30+
});
31+
}
32+
33+
return $this->collectUsers($users);
34+
}
35+
36+
protected function collectUsers(array $users)
37+
{
2438
return [
25-
'users' => Collection::make($channel->getUsers())->map(function ($user) {
39+
'users' => Collection::make($users)->map(function ($user) {
2640
return ['id' => $user->user_id];
2741
})->values(),
2842
];

src/PubSub/Redis/RedisClient.php

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Clue\React\Redis\Client;
88
use Clue\React\Redis\Factory;
99
use React\EventLoop\LoopInterface;
10+
use React\Promise\PromiseInterface;
1011
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
1112
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
1213

@@ -183,6 +184,72 @@ public function publish(string $appId, string $channel, stdClass $payload): bool
183184
return true;
184185
}
185186

187+
/**
188+
* Add a member to a channel. To be called when they have
189+
* subscribed to the channel.
190+
*
191+
* @param string $appId
192+
* @param string $channel
193+
* @param string $socketId
194+
* @param string $data
195+
*/
196+
public function joinChannel(string $appId, string $channel, string $socketId, string $data)
197+
{
198+
$this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]);
199+
}
200+
201+
/**
202+
* Remove a member from the channel. To be called when they have
203+
* unsubscribed from the channel.
204+
*
205+
* @param string $appId
206+
* @param string $channel
207+
* @param string $socketId
208+
*/
209+
public function leaveChannel(string $appId, string $channel, string $socketId)
210+
{
211+
$this->publishClient->__call('hdel', ["$appId:$channel", $socketId]);
212+
}
213+
214+
/**
215+
* Retrieve the full information about the members in a presence channel.
216+
*
217+
* @param string $appId
218+
* @param string $channel
219+
* @return PromiseInterface
220+
*/
221+
public function channelMembers(string $appId, string $channel): PromiseInterface
222+
{
223+
return $this->publishClient->__call('hgetall', ["$appId:$channel"])
224+
->then(function($members) {
225+
// The data is expected as objects, so we need to JSON decode
226+
return array_walk($members, function($user) {
227+
return json_decode($user);
228+
});
229+
});
230+
}
231+
232+
/**
233+
* Get the amount of users subscribed for each presence channel.
234+
*
235+
* @param string $appId
236+
* @param array $channelNames
237+
* @return PromiseInterface
238+
*/
239+
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface
240+
{
241+
$this->publishClient->__call('multi', []);
242+
243+
foreach ($channelNames as $channel) {
244+
$this->publishClient->__call('hlen', ["$appId:$channel"]);
245+
}
246+
247+
return $this->publishClient->__call('exec', [])
248+
->then(function($data) use ($channelNames) {
249+
return array_combine($channelNames, $data);
250+
});
251+
}
252+
186253
/**
187254
* Build the Redis connection URL from Laravel database config.
188255
*

src/PubSub/ReplicationInterface.php

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use stdClass;
66
use React\EventLoop\LoopInterface;
7+
use React\Promise\PromiseInterface;
78

89
interface ReplicationInterface
910
{
@@ -40,4 +41,43 @@ public function subscribe(string $appId, string $channel): bool;
4041
* @return bool
4142
*/
4243
public function unsubscribe(string $appId, string $channel): bool;
44+
45+
/**
46+
* Add a member to a channel. To be called when they have
47+
* subscribed to the channel.
48+
*
49+
* @param string $appId
50+
* @param string $channel
51+
* @param string $socketId
52+
* @param string $data
53+
*/
54+
public function joinChannel(string $appId, string $channel, string $socketId, string $data);
55+
56+
/**
57+
* Remove a member from the channel. To be called when they have
58+
* unsubscribed from the channel.
59+
*
60+
* @param string $appId
61+
* @param string $channel
62+
* @param string $socketId
63+
*/
64+
public function leaveChannel(string $appId, string $channel, string $socketId);
65+
66+
/**
67+
* Retrieve the full information about the members in a presence channel.
68+
*
69+
* @param string $appId
70+
* @param string $channel
71+
* @return PromiseInterface
72+
*/
73+
public function channelMembers(string $appId, string $channel): PromiseInterface;
74+
75+
/**
76+
* Get the amount of users subscribed for each presence channel.
77+
*
78+
* @param string $appId
79+
* @param array $channelNames
80+
* @return PromiseInterface
81+
*/
82+
public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface;
4383
}

src/WebSockets/Channels/Channel.php

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ public function __construct(string $channelName)
2222
$this->channelName = $channelName;
2323
}
2424

25+
public function getChannelName(): string
26+
{
27+
return $this->channelName;
28+
}
29+
2530
public function hasConnections(): bool
2631
{
2732
return count($this->subscribedConnections) > 0;
@@ -32,6 +37,9 @@ public function getSubscribedConnections(): array
3237
return $this->subscribedConnections;
3338
}
3439

40+
/**
41+
* @throws InvalidSignature
42+
*/
3543
protected function verifySignature(ConnectionInterface $connection, stdClass $payload)
3644
{
3745
$signature = "{$connection->socketId}:{$this->channelName}";
@@ -40,12 +48,15 @@ protected function verifySignature(ConnectionInterface $connection, stdClass $pa
4048
$signature .= ":{$payload->channel_data}";
4149
}
4250

43-
if (Str::after($payload->auth, ':') !== hash_hmac('sha256', $signature, $connection->app->secret)) {
51+
if (! hash_equals(
52+
hash_hmac('sha256', $signature, $connection->app->secret),
53+
Str::after($payload->auth, ':'))
54+
) {
4455
throw new InvalidSignature();
4556
}
4657
}
4758

48-
/*
59+
/**
4960
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
5061
*/
5162
public function subscribe(ConnectionInterface $connection, stdClass $payload)
@@ -128,7 +139,7 @@ public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
128139
}
129140
}
130141

131-
public function toArray(): array
142+
public function toArray()
132143
{
133144
return [
134145
'occupied' => count($this->subscribedConnections) > 0,

0 commit comments

Comments
 (0)