Mostly working websocket stuff, some message weirdness at the moment...

This commit is contained in:
Nate Anderson
2025-02-10 18:55:15 -07:00
parent 623474e0c6
commit b37862a321
19 changed files with 543 additions and 87 deletions
+9 -3
View File
@@ -1,5 +1,5 @@
import 'package:backend/database.dart';
import 'package:backend/service/db_access.dart';
import 'package:backend/db/database.dart';
import 'package:backend/db/db_access.dart';
import 'package:backend/utils/environment.dart';
import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart';
import 'package:logging/logging.dart';
@@ -25,7 +25,13 @@ class Authenticator {
final iat = DateTime.now().millisecondsSinceEpoch ~/ 1000;
final jwt = JWT(
header: {'algo': 'HS256'},
JWTBody(uuid: newUser.uuid, roomUuid: newUser.gameRoomUuid, iat: iat, exp: iat + expTimeSecs).toJson(),
JWTBody(
uuid: newUser.uuid,
roomUuid: newUser.gameRoomUuid,
roomCode: req.roomCode,
iat: iat,
exp: iat + expTimeSecs,
).toJson(),
);
return (jwt.sign(SecretKey(jwtSecret!)), newUser);
@@ -1,4 +1,4 @@
import 'package:backend/database.dart';
import 'package:backend/db/database.dart';
import 'package:drift/drift.dart';
import 'package:logging/logging.dart';
import 'package:uuid/uuid.dart';
@@ -9,11 +9,12 @@ class Db {
static final _db = AppDatabase();
static Future<User?> getUserById(String uuid) {
log.finer('Getting user $uuid');
log.finest('Getting user $uuid');
return _db.managers.users.filter((f) => f.uuid.equals(uuid)).getSingleOrNull();
}
static Future<User?> createUser({required String username, required String roomCode}) async {
log.finest('Creating user $username in room $roomCode');
final room = await _db.managers.gameRooms
.filter((f) => f.code.equals(roomCode) & f.status.isIn([GameStatus.open, GameStatus.running]))
.getSingleOrNull()
@@ -32,21 +33,30 @@ class Db {
});
}
static Future<GameRoom?> createRoom({required String roomCode}) => _db.managers.gameRooms
.createReturningOrNull(
(o) => o(createdAt: Value(DateTime.now()), status: GameStatus.open, uuid: const Uuid().v4(), code: roomCode),
)
.catchError(
(Object err) {
log.severe('Failed to create room', err, StackTrace.current);
return null;
},
);
static Future<GameRoom?> createRoom({required String roomCode}) {
log.finest('Creating room with code $roomCode');
return _db.managers.gameRooms
.createReturningOrNull(
(o) => o(createdAt: Value(DateTime.now()), status: GameStatus.open, uuid: const Uuid().v4(), code: roomCode),
)
.catchError(
(Object err) {
log.severe('Failed to create room', err, StackTrace.current);
return null;
},
);
}
static Future<GameRoom?> getRoomByCode(String? roomCode) async {
log.finest('Getting room by code $roomCode');
final room = await _db.managers.gameRooms
.filter((f) => f.code.equals(roomCode) & f.status.isIn([GameStatus.open, GameStatus.running]))
.getSingleOrNull();
return room;
}
static Future<GameRoom?> getRoomById(String roomUuid) async {
log.finest('Getting room $roomUuid');
throw UnimplementedError();
}
}
+121
View File
@@ -0,0 +1,121 @@
import 'dart:async';
import 'dart:isolate';
import 'package:logging/logging.dart';
import 'package:shared_models/room.dart';
// class GameRoomManager {
// GameRoomManager({required SendPort this.socketManagerSendPort}) {
// managerReceivePort = ReceivePort();
// gamePorts = {};
// receiveSubscription = managerReceivePort.listen((message) {
// if (message is GameRoomManagerMessage) {
// handleMessage(message);
// } else if (message is GameRoomMessage) {
// final gameUuid = message.gameUuid;
// final gamePort = gamePorts[gameUuid];
// if (gamePort == null) {
// _logger.warning('Received GameRoomMessage for empty gamePort');
// return;
// }
// gamePort.send(message);
// } else {
// _logger.warning('Received unknown message: $message');
// }
// });
// }
// late final Map<String, SendPort> gamePorts;
// late final ReceivePort managerReceivePort;
// late final StreamSubscription<dynamic> receiveSubscription;
// final SendPort socketManagerSendPort;
// final Logger _logger = Logger('GameRoomManager');
// void close() {
// receiveSubscription.cancel();
// //TODO remove connections
// }
// Future<void> createRoom({required String roomUuid}) async {
// // receivePort
// final wsSendPort = SocketManager().createWsSendPort(roomUuid);
// await Isolate.spawn(LiveGameRoom.spawn, LiveGameRoomData(roomUuid: roomUuid, wsSendPort: wsSendPort, gameManagerSendPort: ));
// // first message from new isolate will be its SendPort
// gamePorts[roomUuid] = await roomReceivePort.first as SendPort;
// }
// void routePlayerToRoom(String roomCode, PlayerConnection player) {
// gamePorts[roomCode]?.addPlayer(player);
// }
// void handleMessage(message) {
// throw UnimplementedError();
// }
// }
class LiveGameRoomData {
LiveGameRoomData({
required this.wsSendPort,
required this.roomUuid,
});
final SendPort wsSendPort;
final String roomUuid;
}
class LiveGameRoom {
LiveGameRoom({
required this.receivePort,
required this.wsSendPort,
required this.logger,
required this.streamSubscription,
required this.roomUuid,
});
Timer? gameLoop;
// final Map<String, PlayerState> players = {};
final ReceivePort receivePort;
final SendPort wsSendPort;
final Logger logger;
final StreamSubscription<dynamic> streamSubscription;
final String roomUuid;
static void spawn(LiveGameRoomData data) {
// Create new isolate for this room
// Return handle for communication
final receivePort = ReceivePort();
data.wsSendPort.send(receivePort.sendPort);
final logger = Logger('LiveGameRoom-${data.roomUuid}');
// ignore: cancel_subscriptions
final streamSubscription = receivePort.listen(logger.info);
LiveGameRoom(
receivePort: receivePort,
wsSendPort: data.wsSendPort,
logger: logger,
streamSubscription: streamSubscription,
roomUuid: data.roomUuid,
).start();
return;
}
void start() {
gameLoop = Timer.periodic(const Duration(milliseconds: 750), update);
}
void update(Timer timer) {
logger.finest('Room $roomUuid tick: ${timer.tick}');
wsSendPort.send(
RoomPingMessage(
roomUuid,
dest: PingDestination.client,
),
);
}
void close() {
streamSubscription.cancel();
}
}
+7 -2
View File
@@ -24,8 +24,13 @@ Middleware tokenAuthMiddleware({
// use `auth.verifyToken(token)` to check the jwt that came in the request header bearer
final authHeader = context.request.headers['authorization'] ?? context.request.headers['Authorization'];
final auths = authHeader?.split(' ');
if (authHeader == null || !authHeader.startsWith('Bearer ') || auths == null || auths.length != 2) {
log.fine('Denied request - No Auth - ${context.request.method.value} ${context.request.uri.path}');
if (authHeader == null ||
!authHeader.toLowerCase().startsWith('bearer') ||
auths == null ||
auths.length != 2) {
log.fine(
'Denied request, no Auth - ${context.request.method.value} ${context.request.uri.path}, Found $auths',
);
return Response(statusCode: HttpStatus.unauthorized);
}
final token = auths.last;
+138
View File
@@ -0,0 +1,138 @@
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
import 'package:backend/game_room_manager.dart';
import 'package:dart_frog_web_socket/dart_frog_web_socket.dart';
import 'package:logging/logging.dart';
import 'package:shared_models/room.dart';
final _logger = Logger('SocketManager');
class SocketManager {
// Default constructor returns instance
factory SocketManager() {
return instance;
}
// Private constructor
SocketManager._();
// Singleton instance
static final SocketManager instance = SocketManager._();
// Store connections: GameRoomUuid -> UserUuid -> WebSocketChannel
final _connections = <String, Map<String, WebSocketChannel>>{};
// Store isolate port and stream subscription to said port
final _gameRoomSendPorts = <String, SendPort>{};
final _gameRoomSpSubs = <String, StreamSubscription<dynamic>>{};
// Add a new connection
Future<Status> addConnection(
WebSocketChannel connection, {
required String roomUuid,
required String userUuid,
}) async {
_logger.finer('Adding connection to socket manager for user $userUuid in room $roomUuid');
if (!_gameRoomSpSubs.containsKey(roomUuid)) {
final status = await spawnGameRoomIsolate(roomUuid);
if (status == Status.failure) {
return status;
}
}
_connections.putIfAbsent(roomUuid, () => <String, WebSocketChannel>{});
_connections[roomUuid]![userUuid] = connection;
return Status.success;
}
// Remove a connection
void removeConnection(String roomUuid, String userUuid) {
_connections[roomUuid]?.remove(userUuid);
if (_connections[roomUuid]?.isEmpty ?? false) {
_connections.remove(roomUuid);
}
}
// Get a specific user's connection
WebSocketChannel? getConnection(String roomUuid, String userUuid) {
return _connections[roomUuid]?[userUuid];
}
// Get all connections in a room
Map<String, WebSocketChannel>? getRoomConnections(String roomUuid) {
return _connections[roomUuid];
}
// Broadcast message to all users in a room except sender
void broadcastToRoom(String roomUuid, GameRoomMessage message, {String? excludeUserUuid}) {
_logger.fine('Broadcasting ${message.type} to room $roomUuid');
_connections[roomUuid]?.forEach((userUuid, connection) {
if (userUuid != excludeUserUuid) {
connection.sink.add(jsonEncode(message.toJson()));
}
});
}
// Check if a user is connected
bool isConnected(String roomUuid, String userUuid) {
return _connections[roomUuid]?.containsKey(userUuid) ?? false;
}
Future<Status> spawnGameRoomIsolate(String roomUuid) async {
try {
_logger.finest('Spawning isolate for room $roomUuid');
if (_gameRoomSendPorts.containsKey(roomUuid)) {
_logger.severe('Tried to create a sendPort for an existing room uuid: $roomUuid', null, StackTrace.current);
throw Exception('Cannot create sendPort, room already has one');
}
final receivePort = ReceivePort();
final sp = receivePort.sendPort;
await Isolate.spawn(LiveGameRoom.spawn, LiveGameRoomData(roomUuid: roomUuid, wsSendPort: sp));
// used to get sendport
final completer = Completer<SendPort>();
// ignore: cancel_subscriptions
final sub = receivePort.listen((message) {
// first message from new isolate will be its SendPort
if (!completer.isCompleted) {
completer.complete(message as SendPort);
return;
}
if (message is GameRoomMessage) {
switch (message) {
case RoomPingMessage():
if (message.dest != PingDestination.client) {
_logger.warning('Got room ping meant for server');
return;
}
broadcastToRoom(
message.roomUuid,
message,
);
case PlayerVoteMessage():
// TODO: Handle this case.
throw UnimplementedError();
case PingMessage():
// TODO: Handle this case.
throw UnimplementedError();
}
} else {
_logger.info('Unknown message: $message');
}
});
_gameRoomSpSubs[roomUuid] = sub;
_gameRoomSendPorts[roomUuid] = await completer.future;
_logger.info('Spawned new game room $roomUuid, listening to messages from room.');
return Status.success;
} catch (e) {
_logger.severe('Failed to spawn game room isolate', e, StackTrace.current);
_gameRoomSendPorts.remove(roomUuid);
final sub = _gameRoomSpSubs.remove(roomUuid);
await sub?.cancel();
}
return Status.failure;
}
}
enum Status { success, failure }
+1 -1
View File
@@ -1,7 +1,7 @@
import 'dart:io';
import 'dart:math';
import 'package:backend/service/db_access.dart';
import 'package:backend/db/db_access.dart';
import 'package:dart_frog/dart_frog.dart';
import 'package:logging/logging.dart';
import 'package:shared_models/room.dart';
+1 -1
View File
@@ -1,4 +1,4 @@
import 'package:backend/service/db_access.dart';
import 'package:backend/db/db_access.dart';
import 'package:dart_frog/dart_frog.dart';
Future<Response> onRequest(RequestContext context, String roomCode) async {
+34
View File
@@ -0,0 +1,34 @@
import 'package:backend/db/database.dart';
import 'package:backend/db/db_access.dart';
import 'package:backend/socket_manager.dart';
import 'package:dart_frog/dart_frog.dart';
import 'package:dart_frog_web_socket/dart_frog_web_socket.dart';
import 'package:logging/logging.dart';
Future<Response> onRequest(RequestContext context, String roomCode) async {
final logger = Logger('room/[$roomCode]/ws');
final handler = webSocketHandler(protocols: ['game.room.v1'], (channel, protocol) async {
try {
channel.sink.add('test');
logger.finest(protocol);
final room = await Db.getRoomByCode(roomCode);
if (room == null) {
logger.finer('Room not found, aborting...');
await channel.sink.close(4404, 'Room not found');
return;
}
final user = context.read<User>();
final status = await SocketManager().addConnection(channel, roomUuid: room.uuid, userUuid: user.uuid);
if (status == Status.failure) {
logger.finer('Failed to spawn room isolate, closing connection.');
await channel.sink.close(4404, 'Room not found');
return;
}
} catch (e) {
logger.severe('Unexpected error occurred getting websocket connection', e, StackTrace.current);
}
});
return handler(context);
}
-9
View File
@@ -1,9 +0,0 @@
import 'package:dart_frog/dart_frog.dart';
import 'package:dart_frog_web_socket/dart_frog_web_socket.dart';
Future<Response> onRequest(RequestContext context) async {
final handler = webSocketHandler((channel, protocol) {
channel.stream.listen(print);
});
return handler(context);
}
+1 -1
View File
@@ -1,6 +1,6 @@
import 'dart:convert';
import 'package:backend/service/db_access.dart';
import 'package:backend/db/db_access.dart';
import 'package:http/http.dart' as http;
import 'package:shared_models/room.dart';
import 'package:shared_models/user.dart';