WIP ws messages, TODO backend receiving frontend messages

This commit is contained in:
Nathan Anderson 2025-02-19 09:28:13 -07:00
parent b37862a321
commit d2e378b4a3
12 changed files with 104 additions and 153 deletions

View File

@ -4,55 +4,6 @@ import 'dart:isolate';
import 'package:logging/logging.dart';
import 'package:shared_models/room.dart';
// class GameRoomManager {
// GameRoomManager({required SendPort this.socketManagerSendPort}) {
// managerReceivePort = ReceivePort();
// gamePorts = {};
// receiveSubscription = managerReceivePort.listen((message) {
// if (message is GameRoomManagerMessage) {
// handleMessage(message);
// } else if (message is GameRoomMessage) {
// final gameUuid = message.gameUuid;
// final gamePort = gamePorts[gameUuid];
// if (gamePort == null) {
// _logger.warning('Received GameRoomMessage for empty gamePort');
// return;
// }
// gamePort.send(message);
// } else {
// _logger.warning('Received unknown message: $message');
// }
// });
// }
// late final Map<String, SendPort> gamePorts;
// late final ReceivePort managerReceivePort;
// late final StreamSubscription<dynamic> receiveSubscription;
// final SendPort socketManagerSendPort;
// final Logger _logger = Logger('GameRoomManager');
// void close() {
// receiveSubscription.cancel();
// //TODO remove connections
// }
// Future<void> createRoom({required String roomUuid}) async {
// // receivePort
// final wsSendPort = SocketManager().createWsSendPort(roomUuid);
// await Isolate.spawn(LiveGameRoom.spawn, LiveGameRoomData(roomUuid: roomUuid, wsSendPort: wsSendPort, gameManagerSendPort: ));
// // first message from new isolate will be its SendPort
// gamePorts[roomUuid] = await roomReceivePort.first as SendPort;
// }
// void routePlayerToRoom(String roomCode, PlayerConnection player) {
// gamePorts[roomCode]?.addPlayer(player);
// }
// void handleMessage(message) {
// throw UnimplementedError();
// }
// }
class LiveGameRoomData {
LiveGameRoomData({
required this.wsSendPort,
@ -102,15 +53,16 @@ class LiveGameRoom {
}
void start() {
gameLoop = Timer.periodic(const Duration(milliseconds: 750), update);
gameLoop = Timer.periodic(const Duration(milliseconds: 3500), update);
}
void update(Timer timer) {
logger.finest('Room $roomUuid tick: ${timer.tick}');
wsSendPort.send(
RoomPingMessage(
PingMessage.now(
roomUuid,
dest: PingDestination.client,
userUuid: '',
),
);
}

View File

@ -25,7 +25,8 @@ class SocketManager {
final _connections = <String, Map<String, WebSocketChannel>>{};
// Store isolate port and stream subscription to said port
final _gameRoomSendPorts = <String, SendPort>{};
final _gameRoomSpSubs = <String, StreamSubscription<dynamic>>{};
final _gameRoomPortSubs = <String, StreamSubscription<dynamic>>{};
final _gameRoomUserWsSubs = <String, StreamSubscription<dynamic>>{};
// Add a new connection
Future<Status> addConnection(
@ -34,7 +35,7 @@ class SocketManager {
required String userUuid,
}) async {
_logger.finer('Adding connection to socket manager for user $userUuid in room $roomUuid');
if (!_gameRoomSpSubs.containsKey(roomUuid)) {
if (!_gameRoomPortSubs.containsKey(roomUuid)) {
final status = await spawnGameRoomIsolate(roomUuid);
if (status == Status.failure) {
return status;
@ -42,6 +43,11 @@ class SocketManager {
}
_connections.putIfAbsent(roomUuid, () => <String, WebSocketChannel>{});
_connections[roomUuid]![userUuid] = connection;
_logger.fine('Listening to websocket for messages');
// ignore: cancel_subscriptions
final sub = connection.stream.listen(_gameRoomMessageListener);
_gameRoomUserWsSubs[userUuid] = sub;
return Status.success;
}
@ -67,6 +73,9 @@ class SocketManager {
void broadcastToRoom(String roomUuid, GameRoomMessage message, {String? excludeUserUuid}) {
_logger.fine('Broadcasting ${message.type} to room $roomUuid');
_connections[roomUuid]?.forEach((userUuid, connection) {
if (message is PingMessage) {
message.userUuid = userUuid;
}
if (userUuid != excludeUserUuid) {
connection.sink.add(jsonEncode(message.toJson()));
}
@ -92,47 +101,49 @@ class SocketManager {
final completer = Completer<SendPort>();
// ignore: cancel_subscriptions
final sub = receivePort.listen((message) {
// first message from new isolate will be its SendPort
if (!completer.isCompleted) {
completer.complete(message as SendPort);
return;
}
if (message is GameRoomMessage) {
switch (message) {
case RoomPingMessage():
if (message.dest != PingDestination.client) {
_logger.warning('Got room ping meant for server');
return;
}
broadcastToRoom(
message.roomUuid,
message,
);
final sub = receivePort.listen((message) => _gameRoomPortListener(message, completer));
case PlayerVoteMessage():
// TODO: Handle this case.
throw UnimplementedError();
case PingMessage():
// TODO: Handle this case.
throw UnimplementedError();
}
} else {
_logger.info('Unknown message: $message');
}
});
_gameRoomSpSubs[roomUuid] = sub;
_gameRoomPortSubs[roomUuid] = sub;
_gameRoomSendPorts[roomUuid] = await completer.future;
_logger.info('Spawned new game room $roomUuid, listening to messages from room.');
return Status.success;
} catch (e) {
_logger.severe('Failed to spawn game room isolate', e, StackTrace.current);
_gameRoomSendPorts.remove(roomUuid);
final sub = _gameRoomSpSubs.remove(roomUuid);
final sub = _gameRoomPortSubs.remove(roomUuid);
await sub?.cancel();
}
return Status.failure;
}
void _gameRoomPortListener(dynamic message, Completer<SendPort> completer) {
// first message from new isolate will be its SendPort
if (!completer.isCompleted) {
completer.complete(message as SendPort);
return;
}
if (message is GameRoomMessage) {
switch (message) {
case PlayerVoteMessage():
// TODO: Handle this case.
throw UnimplementedError();
case PingMessage():
if (message.dest != PingDestination.client) {
_logger.warning('Got room ping meant for server');
return;
}
broadcastToRoom(
message.roomUuid,
message,
);
}
} else {
_logger.info('Unknown message: $message');
}
}
void _gameRoomMessageListener(dynamic message) {}
}
enum Status { success, failure }

View File

@ -16,17 +16,18 @@ Future<Response> onRequest(RequestContext context) async {
try {
// Parse the request body
final body = await context.request.json();
final createUserReq = CreateUserRequest.fromJson(body as Map<String, dynamic>);
final joinRoomRequest = JoinRoomRequest.fromJson(body as Map<String, dynamic>);
// Generate token
final authenticator = context.read<Authenticator>();
final (token, user) = await authenticator.generateToken(createUserReq);
final (token, user) = await authenticator.generateToken(joinRoomRequest);
if (token == null || user == null) {
final body = CreateUserResponse(
success: false,
token: null,
error: user == null ? 'Room ${createUserReq.roomCode} requested is not available' : 'Unexpected error occurred',
error:
user == null ? 'Room ${joinRoomRequest.roomCode} requested is not available' : 'Unexpected error occurred',
uuid: null,
).toJson();
return Response.json(

View File

@ -10,7 +10,6 @@ Future<Response> onRequest(RequestContext context, String roomCode) async {
final handler = webSocketHandler(protocols: ['game.room.v1'], (channel, protocol) async {
try {
channel.sink.add('test');
logger.finest(protocol);
final room = await Db.getRoomByCode(roomCode);
if (room == null) {

View File

@ -5,6 +5,7 @@ import 'package:frontend/providers/game_messages.dart';
import 'package:frontend/providers/web_socket.dart';
import 'package:go_router/go_router.dart';
import 'package:logging/logging.dart';
import 'package:shared_models/room.dart';
final logger = Logger('GameRoomHome');
@ -30,17 +31,33 @@ class _GameRoomHomeState extends ConsumerState<GameRoomHome> {
if (jwt == null || jwt.roomUuid != widget.roomUuid) {
logger.fine('Tried to open room, but not authenticated / wrong room');
// return home
context.go('/');
WidgetsBinding.instance.addPostFrameCallback((_) => context.go('/'));
}
final connection = ref.watch(webSocketNotifierProvider).valueOrNull;
ref.listen(
gameMessageNotifierProvider,
(previous, next) {
print('Got message: $next');
},
);
if (jwt != null) {
ref.listen(
gameMessageNotifierProvider,
(previous, next) {
final message = next.valueOrNull;
if (message is GameRoomMessage) {
switch (message) {
case PingMessage():
final ping = PingMessage.now(
jwt.roomUuid,
dest: PingDestination.server,
userUuid: jwt.uuid,
);
ref.read(webSocketNotifierProvider.notifier).sendMessage(ping);
case PlayerVoteMessage():
// TODO: Handle this case.
throw UnimplementedError();
}
}
},
);
}
// enstablish ws connection at /room/roomCode/ws and save to gameMessageProvider
return Scaffold(
body: Column(

View File

@ -120,22 +120,6 @@ class _JoinRoomHomeState extends ConsumerState<JoinRoomHome> {
code: _codeController.text,
),
);
// )
// .whenData(
// (response) {
// if (response != null && response.uuid != null) {
// logger.fine('Navigating to room ${response.uuid}');
// // context.go('room/${response.uuid}');
// } else {
// ScaffoldMessenger.of(context).showSnackBar(
// SnackBar(
// content: Text('Unexpected error occurred.'),
// backgroundColor: Colors.red,
// ),
// );
// }
// },
// );
} finally {
setState(() => _isLoading = false);
}

View File

@ -19,15 +19,15 @@ Future<JoinRoomResponse?> joinRoom(Ref ref, {required String username, required
data: JoinRoomRequest(username: username, roomCode: code).toJson(),
);
if (response.statusCode == 200 && response.data != null) {
final joinResponse = JoinRoomResponse.fromJson(response.data!);
final joinResponse = JoinRoomResponse.fromJson(response.data!);
if (joinResponse.success) {
if (joinResponse.token != null) {
logger.fine('Setting token: ${joinResponse.token!.substring(10)}');
await ref.read(jwtNotifierProvider.notifier).setJwt(joinResponse.token!);
}
return joinResponse;
} else {
logger.warning('Could not join room');
logger.warning('Could not join room: ${joinResponse.toJson()}');
}
} catch (e) {
logger.severe('Failed to join room', e, StackTrace.current);

View File

@ -57,7 +57,7 @@ JWTBody? jwtBody(Ref ref) {
final payload = JwtDecoder.tryDecode(jwtString);
if (payload == null) {
logger.fine('Failed to decode JWT, removing key.');
ref.read(jwtNotifierProvider.notifier).eraseJwt();
Future.delayed(const Duration(), () => ref.read(jwtNotifierProvider.notifier).eraseJwt());
return null;
}
try {
@ -66,6 +66,7 @@ JWTBody? jwtBody(Ref ref) {
} catch (e) {
logger.shout(
'Failed to parse JWT payload to JWTBody, something is wrong.\nPayload: $payload', e, StackTrace.current);
Future.delayed(const Duration(), () => ref.read(jwtNotifierProvider.notifier).eraseJwt());
return null;
}
}

View File

@ -15,6 +15,7 @@ Dio dio(Ref ref) {
baseUrl: 'http://localhost:8080',
connectTimeout: const Duration(seconds: 5),
receiveTimeout: const Duration(seconds: 3),
validateStatus: (status) => true,
));
final jwt = ref.watch(jwtNotifierProvider).valueOrNull;

View File

@ -30,7 +30,7 @@ class GameMessageNotifier extends _$GameMessageNotifier {
return null;
}
} catch (e) {
_logger.severe('Error parsing message: `${message.runtimeType}` $message', e, StackTrace.current);
_logger.severe('Error parsing message: Type `${message.runtimeType}` $message', e, StackTrace.current);
return null;
}
});

View File

@ -1,9 +1,11 @@
import 'dart:convert';
import 'dart:io';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:frontend/providers/auth.dart';
import 'package:logging/logging.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
import 'package:shared_models/room.dart';
part 'web_socket.g.dart';
@ -41,18 +43,19 @@ class WebSocketNotifier extends _$WebSocketNotifier {
_logger.warning('Error occurred creating web socket: $e');
}
}
}
// @riverpod
// class WebSocketStreamNotifier extends _$WebSocketStreamNotifier {
// @override
// Stream<dynamic> build() {
// final connection = ref.watch(webSocketNotifierProvider).valueOrNull;
// if (connection == null) return Stream.empty();
// _logger.finest('Created broadcast stream from ws connection');
// return connection.asBroadcastStream();
// }
// }
void sendMessage(GameRoomMessage message) {
final msgStr = jsonEncode(message.toJson());
final socket = state.valueOrNull;
if (socket == null) {
// TODO add queue
_logger.info('Socket unavailable... adding to queue');
throw UnimplementedError('No queue available');
}
_logger.finest('Sending message $message on websocket');
socket.add(msgStr);
}
}
@riverpod
Raw<Stream<dynamic>> webSocketStream(Ref ref) {

View File

@ -27,12 +27,11 @@ class CreateRoomResponse {
sealed class GameRoomMessage {
GameRoomMessage(this.roomUuid);
final String roomUuid;
abstract final String type;
abstract String? type;
factory GameRoomMessage.fromJson(Map<String, dynamic> json) {
return switch (json['type']) {
'ping' => PingMessage.fromJson(json),
'roomPing' => RoomPingMessage.fromJson(json),
'playerVote' => PlayerVoteMessage.fromJson(json),
_ => throw Exception('Unknown message type'),
};
@ -45,41 +44,24 @@ enum PingDestination { client, server }
@JsonSerializable()
class PingMessage extends GameRoomMessage {
DateTime timestamp;
late final DateTime timestamp;
final PingDestination dest;
final String userUuid;
String userUuid;
PingMessage(super.roomUuid, {required this.dest, required this.userUuid}) {
timestamp = DateTime.now();
PingMessage(super.roomUuid, {required this.dest, required this.userUuid, required this.timestamp}) {
type = 'ping';
}
factory PingMessage.now(String roomUuid, {required PingDestination dest, required String userUuid}) =>
PingMessage(roomUuid, userUuid: userUuid, dest: dest, timestamp: DateTime.now());
factory PingMessage.fromJson(Map<String, dynamic> json) => _$PingMessageFromJson(json);
@override
Map<String, dynamic> toJson() => _$PingMessageToJson(this);
@override
late final String type;
}
@JsonSerializable()
class RoomPingMessage extends GameRoomMessage {
late final DateTime timestamp;
final PingDestination dest;
RoomPingMessage(super.roomUuid, {required this.dest}) {
timestamp = DateTime.now();
type = 'roomPing';
}
factory RoomPingMessage.fromJson(Map<String, dynamic> json) => _$RoomPingMessageFromJson(json);
@override
Map<String, dynamic> toJson() => _$RoomPingMessageToJson(this);
@override
late final String type;
String? type;
}
@JsonSerializable()
@ -97,5 +79,5 @@ class PlayerVoteMessage extends GameRoomMessage {
Map<String, dynamic> toJson() => _$PlayerVoteMessageToJson(this);
@override
late final String type;
String? type;
}