From b37862a321283e205987e410e6383632f0944492 Mon Sep 17 00:00:00 2001
From: Nate Anderson <nate.anderson@vasion.com>
Date: Mon, 10 Feb 2025 18:55:15 -0700
Subject: [PATCH] Mostly working websocket stuff, some message weirdness at the
 moment...

---
 backend/lib/authenticator.dart              |  12 +-
 backend/lib/{ => db}/database.dart          |   0
 backend/lib/{service => db}/db_access.dart  |  34 +++--
 backend/lib/game_room_manager.dart          | 121 +++++++++++++++++
 backend/lib/middleware/auth_middleware.dart |   9 +-
 backend/lib/socket_manager.dart             | 138 ++++++++++++++++++++
 backend/routes/create_room.dart             |   2 +-
 backend/routes/room/[roomCode]/ping.dart    |   2 +-
 backend/routes/room/[roomCode]/ws.dart      |  34 +++++
 backend/routes/ws.dart                      |   9 --
 backend/test_e2e/tests/game_room_test.dart  |   2 +-
 frontend/lib/features/room/game_room.dart   |  36 ++++-
 frontend/lib/features/room/join_room.dart   |  10 +-
 frontend/lib/providers/auth.dart            |   8 +-
 frontend/lib/providers/dio.dart             |  33 ++++-
 frontend/lib/providers/game_messages.dart   |  46 ++++---
 frontend/lib/providers/web_socket.dart      |  63 +++++++++
 shared_models/lib/jwt.dart                  |   5 +-
 shared_models/lib/room.dart                 |  66 +++++++---
 19 files changed, 543 insertions(+), 87 deletions(-)
 rename backend/lib/{ => db}/database.dart (100%)
 rename backend/lib/{service => db}/db_access.dart (66%)
 create mode 100644 backend/lib/game_room_manager.dart
 create mode 100644 backend/lib/socket_manager.dart
 create mode 100644 backend/routes/room/[roomCode]/ws.dart
 delete mode 100644 backend/routes/ws.dart
 create mode 100644 frontend/lib/providers/web_socket.dart

diff --git a/backend/lib/authenticator.dart b/backend/lib/authenticator.dart
index 46db183..ad6037b 100644
--- a/backend/lib/authenticator.dart
+++ b/backend/lib/authenticator.dart
@@ -1,5 +1,5 @@
-import 'package:backend/database.dart';
-import 'package:backend/service/db_access.dart';
+import 'package:backend/db/database.dart';
+import 'package:backend/db/db_access.dart';
 import 'package:backend/utils/environment.dart';
 import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart';
 import 'package:logging/logging.dart';
@@ -25,7 +25,13 @@ class Authenticator {
     final iat = DateTime.now().millisecondsSinceEpoch ~/ 1000;
     final jwt = JWT(
       header: {'algo': 'HS256'},
-      JWTBody(uuid: newUser.uuid, roomUuid: newUser.gameRoomUuid, iat: iat, exp: iat + expTimeSecs).toJson(),
+      JWTBody(
+        uuid: newUser.uuid,
+        roomUuid: newUser.gameRoomUuid,
+        roomCode: req.roomCode,
+        iat: iat,
+        exp: iat + expTimeSecs,
+      ).toJson(),
     );
 
     return (jwt.sign(SecretKey(jwtSecret!)), newUser);
diff --git a/backend/lib/database.dart b/backend/lib/db/database.dart
similarity index 100%
rename from backend/lib/database.dart
rename to backend/lib/db/database.dart
diff --git a/backend/lib/service/db_access.dart b/backend/lib/db/db_access.dart
similarity index 66%
rename from backend/lib/service/db_access.dart
rename to backend/lib/db/db_access.dart
index b576ff0..4d08292 100644
--- a/backend/lib/service/db_access.dart
+++ b/backend/lib/db/db_access.dart
@@ -1,4 +1,4 @@
-import 'package:backend/database.dart';
+import 'package:backend/db/database.dart';
 import 'package:drift/drift.dart';
 import 'package:logging/logging.dart';
 import 'package:uuid/uuid.dart';
@@ -9,11 +9,12 @@ class Db {
   static final _db = AppDatabase();
 
   static Future<User?> getUserById(String uuid) {
-    log.finer('Getting user $uuid');
+    log.finest('Getting user $uuid');
     return _db.managers.users.filter((f) => f.uuid.equals(uuid)).getSingleOrNull();
   }
 
   static Future<User?> createUser({required String username, required String roomCode}) async {
+    log.finest('Creating user $username in room $roomCode');
     final room = await _db.managers.gameRooms
         .filter((f) => f.code.equals(roomCode) & f.status.isIn([GameStatus.open, GameStatus.running]))
         .getSingleOrNull()
@@ -32,21 +33,30 @@ class Db {
     });
   }
 
-  static Future<GameRoom?> createRoom({required String roomCode}) => _db.managers.gameRooms
-          .createReturningOrNull(
-        (o) => o(createdAt: Value(DateTime.now()), status: GameStatus.open, uuid: const Uuid().v4(), code: roomCode),
-      )
-          .catchError(
-        (Object err) {
-          log.severe('Failed to create room', err, StackTrace.current);
-          return null;
-        },
-      );
+  static Future<GameRoom?> createRoom({required String roomCode}) {
+    log.finest('Creating room with code $roomCode');
+    return _db.managers.gameRooms
+        .createReturningOrNull(
+      (o) => o(createdAt: Value(DateTime.now()), status: GameStatus.open, uuid: const Uuid().v4(), code: roomCode),
+    )
+        .catchError(
+      (Object err) {
+        log.severe('Failed to create room', err, StackTrace.current);
+        return null;
+      },
+    );
+  }
 
   static Future<GameRoom?> getRoomByCode(String? roomCode) async {
+    log.finest('Getting room by code $roomCode');
     final room = await _db.managers.gameRooms
         .filter((f) => f.code.equals(roomCode) & f.status.isIn([GameStatus.open, GameStatus.running]))
         .getSingleOrNull();
     return room;
   }
+
+  static Future<GameRoom?> getRoomById(String roomUuid) async {
+    log.finest('Getting room $roomUuid');
+    throw UnimplementedError();
+  }
 }
diff --git a/backend/lib/game_room_manager.dart b/backend/lib/game_room_manager.dart
new file mode 100644
index 0000000..0f90468
--- /dev/null
+++ b/backend/lib/game_room_manager.dart
@@ -0,0 +1,121 @@
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:logging/logging.dart';
+import 'package:shared_models/room.dart';
+
+// class GameRoomManager {
+//   GameRoomManager({required SendPort this.socketManagerSendPort}) {
+//     managerReceivePort = ReceivePort();
+//     gamePorts = {};
+//     receiveSubscription = managerReceivePort.listen((message) {
+//       if (message is GameRoomManagerMessage) {
+//         handleMessage(message);
+//       } else if (message is GameRoomMessage) {
+//         final gameUuid = message.gameUuid;
+//         final gamePort = gamePorts[gameUuid];
+//         if (gamePort == null) {
+//           _logger.warning('Received GameRoomMessage for empty gamePort');
+//           return;
+//         }
+//         gamePort.send(message);
+//       } else {
+//         _logger.warning('Received unknown message: $message');
+//       }
+//     });
+//   }
+
+//   late final Map<String, SendPort> gamePorts;
+//   late final ReceivePort managerReceivePort;
+//   late final StreamSubscription<dynamic> receiveSubscription;
+//   final SendPort socketManagerSendPort;
+//   final Logger _logger = Logger('GameRoomManager');
+
+//   void close() {
+//     receiveSubscription.cancel();
+//     //TODO remove connections
+//   }
+
+//   Future<void> createRoom({required String roomUuid}) async {
+//     // receivePort
+//     final wsSendPort = SocketManager().createWsSendPort(roomUuid);
+//     await Isolate.spawn(LiveGameRoom.spawn, LiveGameRoomData(roomUuid: roomUuid, wsSendPort: wsSendPort, gameManagerSendPort: ));
+//     // first message from new isolate will be its SendPort
+//     gamePorts[roomUuid] = await roomReceivePort.first as SendPort;
+//   }
+
+//   void routePlayerToRoom(String roomCode, PlayerConnection player) {
+//     gamePorts[roomCode]?.addPlayer(player);
+//   }
+
+//   void handleMessage(message) {
+//     throw UnimplementedError();
+//   }
+// }
+
+class LiveGameRoomData {
+  LiveGameRoomData({
+    required this.wsSendPort,
+    required this.roomUuid,
+  });
+
+  final SendPort wsSendPort;
+  final String roomUuid;
+}
+
+class LiveGameRoom {
+  LiveGameRoom({
+    required this.receivePort,
+    required this.wsSendPort,
+    required this.logger,
+    required this.streamSubscription,
+    required this.roomUuid,
+  });
+
+  Timer? gameLoop;
+  // final Map<String, PlayerState> players = {};
+  final ReceivePort receivePort;
+  final SendPort wsSendPort;
+  final Logger logger;
+  final StreamSubscription<dynamic> streamSubscription;
+  final String roomUuid;
+
+  static void spawn(LiveGameRoomData data) {
+    // Create new isolate for this room
+    // Return handle for communication
+    final receivePort = ReceivePort();
+    data.wsSendPort.send(receivePort.sendPort);
+    final logger = Logger('LiveGameRoom-${data.roomUuid}');
+
+    // ignore: cancel_subscriptions
+    final streamSubscription = receivePort.listen(logger.info);
+
+    LiveGameRoom(
+      receivePort: receivePort,
+      wsSendPort: data.wsSendPort,
+      logger: logger,
+      streamSubscription: streamSubscription,
+      roomUuid: data.roomUuid,
+    ).start();
+
+    return;
+  }
+
+  void start() {
+    gameLoop = Timer.periodic(const Duration(milliseconds: 750), update);
+  }
+
+  void update(Timer timer) {
+    logger.finest('Room $roomUuid tick: ${timer.tick}');
+    wsSendPort.send(
+      RoomPingMessage(
+        roomUuid,
+        dest: PingDestination.client,
+      ),
+    );
+  }
+
+  void close() {
+    streamSubscription.cancel();
+  }
+}
diff --git a/backend/lib/middleware/auth_middleware.dart b/backend/lib/middleware/auth_middleware.dart
index 294f7a0..9fceaba 100644
--- a/backend/lib/middleware/auth_middleware.dart
+++ b/backend/lib/middleware/auth_middleware.dart
@@ -24,8 +24,13 @@ Middleware tokenAuthMiddleware({
         // use `auth.verifyToken(token)` to check the jwt that came in the request header bearer
         final authHeader = context.request.headers['authorization'] ?? context.request.headers['Authorization'];
         final auths = authHeader?.split(' ');
-        if (authHeader == null || !authHeader.startsWith('Bearer ') || auths == null || auths.length != 2) {
-          log.fine('Denied request - No Auth - ${context.request.method.value} ${context.request.uri.path}');
+        if (authHeader == null ||
+            !authHeader.toLowerCase().startsWith('bearer') ||
+            auths == null ||
+            auths.length != 2) {
+          log.fine(
+            'Denied request, no Auth - ${context.request.method.value} ${context.request.uri.path}, Found $auths',
+          );
           return Response(statusCode: HttpStatus.unauthorized);
         }
         final token = auths.last;
diff --git a/backend/lib/socket_manager.dart b/backend/lib/socket_manager.dart
new file mode 100644
index 0000000..b0b1619
--- /dev/null
+++ b/backend/lib/socket_manager.dart
@@ -0,0 +1,138 @@
+import 'dart:async';
+import 'dart:convert';
+import 'dart:isolate';
+
+import 'package:backend/game_room_manager.dart';
+import 'package:dart_frog_web_socket/dart_frog_web_socket.dart';
+import 'package:logging/logging.dart';
+import 'package:shared_models/room.dart';
+
+final _logger = Logger('SocketManager');
+
+class SocketManager {
+  // Default constructor returns instance
+  factory SocketManager() {
+    return instance;
+  }
+
+  // Private constructor
+  SocketManager._();
+
+  // Singleton instance
+  static final SocketManager instance = SocketManager._();
+
+  // Store connections: GameRoomUuid -> UserUuid -> WebSocketChannel
+  final _connections = <String, Map<String, WebSocketChannel>>{};
+  // Store isolate port and stream subscription to said port
+  final _gameRoomSendPorts = <String, SendPort>{};
+  final _gameRoomSpSubs = <String, StreamSubscription<dynamic>>{};
+
+  // Add a new connection
+  Future<Status> addConnection(
+    WebSocketChannel connection, {
+    required String roomUuid,
+    required String userUuid,
+  }) async {
+    _logger.finer('Adding connection to socket manager for user $userUuid in room $roomUuid');
+    if (!_gameRoomSpSubs.containsKey(roomUuid)) {
+      final status = await spawnGameRoomIsolate(roomUuid);
+      if (status == Status.failure) {
+        return status;
+      }
+    }
+    _connections.putIfAbsent(roomUuid, () => <String, WebSocketChannel>{});
+    _connections[roomUuid]![userUuid] = connection;
+    return Status.success;
+  }
+
+  // Remove a connection
+  void removeConnection(String roomUuid, String userUuid) {
+    _connections[roomUuid]?.remove(userUuid);
+    if (_connections[roomUuid]?.isEmpty ?? false) {
+      _connections.remove(roomUuid);
+    }
+  }
+
+  // Get a specific user's connection
+  WebSocketChannel? getConnection(String roomUuid, String userUuid) {
+    return _connections[roomUuid]?[userUuid];
+  }
+
+  // Get all connections in a room
+  Map<String, WebSocketChannel>? getRoomConnections(String roomUuid) {
+    return _connections[roomUuid];
+  }
+
+  // Broadcast message to all users in a room except sender
+  void broadcastToRoom(String roomUuid, GameRoomMessage message, {String? excludeUserUuid}) {
+    _logger.fine('Broadcasting ${message.type} to room $roomUuid');
+    _connections[roomUuid]?.forEach((userUuid, connection) {
+      if (userUuid != excludeUserUuid) {
+        connection.sink.add(jsonEncode(message.toJson()));
+      }
+    });
+  }
+
+  // Check if a user is connected
+  bool isConnected(String roomUuid, String userUuid) {
+    return _connections[roomUuid]?.containsKey(userUuid) ?? false;
+  }
+
+  Future<Status> spawnGameRoomIsolate(String roomUuid) async {
+    try {
+      _logger.finest('Spawning isolate for room $roomUuid');
+      if (_gameRoomSendPorts.containsKey(roomUuid)) {
+        _logger.severe('Tried to create a sendPort for an existing room uuid: $roomUuid', null, StackTrace.current);
+        throw Exception('Cannot create sendPort, room already has one');
+      }
+      final receivePort = ReceivePort();
+      final sp = receivePort.sendPort;
+      await Isolate.spawn(LiveGameRoom.spawn, LiveGameRoomData(roomUuid: roomUuid, wsSendPort: sp));
+      // used to get sendport
+      final completer = Completer<SendPort>();
+
+      // ignore: cancel_subscriptions
+      final sub = receivePort.listen((message) {
+        // first message from new isolate will be its SendPort
+        if (!completer.isCompleted) {
+          completer.complete(message as SendPort);
+          return;
+        }
+        if (message is GameRoomMessage) {
+          switch (message) {
+            case RoomPingMessage():
+              if (message.dest != PingDestination.client) {
+                _logger.warning('Got room ping meant for server');
+                return;
+              }
+              broadcastToRoom(
+                message.roomUuid,
+                message,
+              );
+
+            case PlayerVoteMessage():
+              // TODO: Handle this case.
+              throw UnimplementedError();
+            case PingMessage():
+              // TODO: Handle this case.
+              throw UnimplementedError();
+          }
+        } else {
+          _logger.info('Unknown message: $message');
+        }
+      });
+      _gameRoomSpSubs[roomUuid] = sub;
+      _gameRoomSendPorts[roomUuid] = await completer.future;
+      _logger.info('Spawned new game room $roomUuid, listening to messages from room.');
+      return Status.success;
+    } catch (e) {
+      _logger.severe('Failed to spawn game room isolate', e, StackTrace.current);
+      _gameRoomSendPorts.remove(roomUuid);
+      final sub = _gameRoomSpSubs.remove(roomUuid);
+      await sub?.cancel();
+    }
+    return Status.failure;
+  }
+}
+
+enum Status { success, failure }
diff --git a/backend/routes/create_room.dart b/backend/routes/create_room.dart
index 9ae7641..2f6d28f 100644
--- a/backend/routes/create_room.dart
+++ b/backend/routes/create_room.dart
@@ -1,7 +1,7 @@
 import 'dart:io';
 import 'dart:math';
 
-import 'package:backend/service/db_access.dart';
+import 'package:backend/db/db_access.dart';
 import 'package:dart_frog/dart_frog.dart';
 import 'package:logging/logging.dart';
 import 'package:shared_models/room.dart';
diff --git a/backend/routes/room/[roomCode]/ping.dart b/backend/routes/room/[roomCode]/ping.dart
index 544f9cd..0f12590 100644
--- a/backend/routes/room/[roomCode]/ping.dart
+++ b/backend/routes/room/[roomCode]/ping.dart
@@ -1,4 +1,4 @@
-import 'package:backend/service/db_access.dart';
+import 'package:backend/db/db_access.dart';
 import 'package:dart_frog/dart_frog.dart';
 
 Future<Response> onRequest(RequestContext context, String roomCode) async {
diff --git a/backend/routes/room/[roomCode]/ws.dart b/backend/routes/room/[roomCode]/ws.dart
new file mode 100644
index 0000000..6142348
--- /dev/null
+++ b/backend/routes/room/[roomCode]/ws.dart
@@ -0,0 +1,34 @@
+import 'package:backend/db/database.dart';
+import 'package:backend/db/db_access.dart';
+import 'package:backend/socket_manager.dart';
+import 'package:dart_frog/dart_frog.dart';
+import 'package:dart_frog_web_socket/dart_frog_web_socket.dart';
+import 'package:logging/logging.dart';
+
+Future<Response> onRequest(RequestContext context, String roomCode) async {
+  final logger = Logger('room/[$roomCode]/ws');
+
+  final handler = webSocketHandler(protocols: ['game.room.v1'], (channel, protocol) async {
+    try {
+      channel.sink.add('test');
+      logger.finest(protocol);
+      final room = await Db.getRoomByCode(roomCode);
+      if (room == null) {
+        logger.finer('Room not found, aborting...');
+        await channel.sink.close(4404, 'Room not found');
+        return;
+      }
+      final user = context.read<User>();
+
+      final status = await SocketManager().addConnection(channel, roomUuid: room.uuid, userUuid: user.uuid);
+      if (status == Status.failure) {
+        logger.finer('Failed to spawn room isolate, closing connection.');
+        await channel.sink.close(4404, 'Room not found');
+        return;
+      }
+    } catch (e) {
+      logger.severe('Unexpected error occurred getting websocket connection', e, StackTrace.current);
+    }
+  });
+  return handler(context);
+}
diff --git a/backend/routes/ws.dart b/backend/routes/ws.dart
deleted file mode 100644
index 8142840..0000000
--- a/backend/routes/ws.dart
+++ /dev/null
@@ -1,9 +0,0 @@
-import 'package:dart_frog/dart_frog.dart';
-import 'package:dart_frog_web_socket/dart_frog_web_socket.dart';
-
-Future<Response> onRequest(RequestContext context) async {
-  final handler = webSocketHandler((channel, protocol) {
-    channel.stream.listen(print);
-  });
-  return handler(context);
-}
diff --git a/backend/test_e2e/tests/game_room_test.dart b/backend/test_e2e/tests/game_room_test.dart
index e0ed32e..defc705 100644
--- a/backend/test_e2e/tests/game_room_test.dart
+++ b/backend/test_e2e/tests/game_room_test.dart
@@ -1,6 +1,6 @@
 import 'dart:convert';
 
-import 'package:backend/service/db_access.dart';
+import 'package:backend/db/db_access.dart';
 import 'package:http/http.dart' as http;
 import 'package:shared_models/room.dart';
 import 'package:shared_models/user.dart';
diff --git a/frontend/lib/features/room/game_room.dart b/frontend/lib/features/room/game_room.dart
index 78c9a91..3069d46 100644
--- a/frontend/lib/features/room/game_room.dart
+++ b/frontend/lib/features/room/game_room.dart
@@ -2,6 +2,7 @@ import 'package:flutter/material.dart';
 import 'package:flutter_riverpod/flutter_riverpod.dart';
 import 'package:frontend/providers/auth.dart';
 import 'package:frontend/providers/game_messages.dart';
+import 'package:frontend/providers/web_socket.dart';
 import 'package:go_router/go_router.dart';
 import 'package:logging/logging.dart';
 
@@ -17,6 +18,12 @@ class GameRoomHome extends ConsumerStatefulWidget {
 }
 
 class _GameRoomHomeState extends ConsumerState<GameRoomHome> {
+  @override
+  void initState() {
+    WidgetsBinding.instance.addPostFrameCallback((_) => ref.read(webSocketNotifierProvider.notifier).connect());
+    super.initState();
+  }
+
   @override
   Widget build(BuildContext context) {
     final jwt = ref.watch(jwtBodyProvider);
@@ -25,13 +32,30 @@ class _GameRoomHomeState extends ConsumerState<GameRoomHome> {
       // return home
       context.go('/');
     }
+
+    final connection = ref.watch(webSocketNotifierProvider).valueOrNull;
+
+    ref.listen(
+      gameMessageNotifierProvider,
+      (previous, next) {
+        print('Got message: $next');
+      },
+    );
     // enstablish ws connection at /room/roomCode/ws and save to gameMessageProvider
-    ref.read(gameMessageNotifierProvider.notifier).connect(jwt!.roomUuid);
-    return Column(
-      children: [
-        Text('Authenticated.'),
-        Text('Welcome to room ${widget.roomUuid}'),
-      ],
+    return Scaffold(
+      body: Column(
+        children: [
+          Text('Authenticated.'),
+          Text('Welcome to room ${widget.roomUuid}'),
+          ElevatedButton(
+              onPressed: connection == null
+                  ? null
+                  : () {
+                      connection.add('Test message');
+                    },
+              child: Text('Send message on socket')),
+        ],
+      ),
     );
   }
 }
diff --git a/frontend/lib/features/room/join_room.dart b/frontend/lib/features/room/join_room.dart
index 43883dd..d48894b 100644
--- a/frontend/lib/features/room/join_room.dart
+++ b/frontend/lib/features/room/join_room.dart
@@ -30,16 +30,14 @@ class _JoinRoomHomeState extends ConsumerState<JoinRoomHome> {
 
   @override
   Widget build(BuildContext context) {
-    final jwtAsync = ref.watch(jwtNotifierProvider);
+    final jwtBody = ref.watch(jwtBodyProvider);
 
-    jwtAsync.whenData((jwt) {
-      logger.fine('Got jwt: ${jwt == null ? 'NULL' : jwt.toString().substring(10)}');
-      if (jwt == null) return;
+    if (jwtBody != null) {
       logger.fine('Navigating to game room screen');
       WidgetsBinding.instance.addPostFrameCallback(
-        (_) => context.go('/room/${jwt.roomUuid}'),
+        (_) => context.go('/room/${jwtBody.roomUuid}'),
       );
-    });
+    }
 
     return Scaffold(
       body: Padding(
diff --git a/frontend/lib/providers/auth.dart b/frontend/lib/providers/auth.dart
index 1875d9e..157d858 100644
--- a/frontend/lib/providers/auth.dart
+++ b/frontend/lib/providers/auth.dart
@@ -40,6 +40,11 @@ class JwtNotifier extends _$JwtNotifier {
 
     state = AsyncValue.data(jwt);
   }
+
+  Future<void> eraseJwt() async {
+    SharedPreferencesAsync().remove('jwt');
+    state = AsyncValue.data(null);
+  }
 }
 
 @riverpod
@@ -52,8 +57,7 @@ JWTBody? jwtBody(Ref ref) {
   final payload = JwtDecoder.tryDecode(jwtString);
   if (payload == null) {
     logger.fine('Failed to decode JWT, removing key.');
-    SharedPreferencesAsync().remove('jwt');
-    ref.invalidate(jwtNotifierProvider);
+    ref.read(jwtNotifierProvider.notifier).eraseJwt();
     return null;
   }
   try {
diff --git a/frontend/lib/providers/dio.dart b/frontend/lib/providers/dio.dart
index 9cc0dda..9f566f8 100644
--- a/frontend/lib/providers/dio.dart
+++ b/frontend/lib/providers/dio.dart
@@ -3,6 +3,7 @@ import 'package:flutter_riverpod/flutter_riverpod.dart';
 import 'package:frontend/providers/auth.dart';
 import 'package:logging/logging.dart';
 import 'package:riverpod_annotation/riverpod_annotation.dart';
+import 'package:shared_models/jwt.dart';
 
 part 'dio.g.dart';
 
@@ -17,8 +18,13 @@ Dio dio(Ref ref) {
   ));
 
   final jwt = ref.watch(jwtNotifierProvider).valueOrNull;
+  final jwtBody = ref.read(jwtBodyProvider);
 
-  dio.interceptors.addAll([JwtInterceptor(jwt: jwt), CustomLogInterceptor()]);
+  dio.interceptors.addAll([
+    JwtInterceptor(
+        jwt: jwt, jwtBody: jwtBody, invalidateJwtCallback: () => ref.read(jwtNotifierProvider.notifier).eraseJwt()),
+    CustomLogInterceptor()
+  ]);
 
   logger.fine('Created new Dio object');
 
@@ -28,15 +34,36 @@ Dio dio(Ref ref) {
 // Adds the jwt to
 class JwtInterceptor extends Interceptor {
   final String? jwt;
+  final JWTBody? jwtBody;
+  final Function invalidateJwtCallback;
 
-  JwtInterceptor({required this.jwt});
+  JwtInterceptor({required this.jwt, required this.jwtBody, required this.invalidateJwtCallback});
 
   @override
   void onRequest(RequestOptions options, RequestInterceptorHandler handler) {
+    if (jwt == null || jwtBody == null) {
+      handler.next(options);
+      return;
+    }
+    if (jwtBody != null && jwtBody!.exp < DateTime.now().millisecondsSinceEpoch ~/ 1000) {
+      invalidateJwtCallback();
+      handler.next(options);
+      return;
+    }
     if (jwt != null) {
       options.headers['Authorization'] = 'Bearer $jwt';
+      handler.next(options);
     }
-    handler.next(options);
+  }
+
+  // on unauthorized request, remove jwt
+  @override
+  // ignore: strict_raw_type
+  void onResponse(Response response, ResponseInterceptorHandler handler) {
+    if (response.statusCode == 401) {
+      invalidateJwtCallback();
+    }
+    handler.next(response);
   }
 }
 
diff --git a/frontend/lib/providers/game_messages.dart b/frontend/lib/providers/game_messages.dart
index 29c6adf..2d94983 100644
--- a/frontend/lib/providers/game_messages.dart
+++ b/frontend/lib/providers/game_messages.dart
@@ -1,43 +1,47 @@
+import 'dart:async';
 import 'dart:convert';
-import 'dart:io';
 
-import 'package:flutter_riverpod/flutter_riverpod.dart';
-import 'package:frontend/providers/dio.dart';
+import 'package:frontend/providers/web_socket.dart';
+import 'package:logging/logging.dart';
 import 'package:riverpod_annotation/riverpod_annotation.dart';
 import 'package:shared_models/room.dart';
 
 part 'game_messages.g.dart';
 
+final _logger = Logger('GameMessageNotifier');
+
 @riverpod
 class GameMessageNotifier extends _$GameMessageNotifier {
+  StreamSubscription<GameRoomMessage?>? _sub;
+
   @override
-  Stream<GameRoomMessage> build() {
-    return Stream.empty();
-  }
+  Stream<GameRoomMessage?> build() {
+    final Stream<dynamic>? stream = ref.watch(webSocketStreamProvider);
+    if (stream == null) {
+      return Stream.empty();
+    }
 
-  Future<void> connect(String gameRoomUuid) async {
-    final dio = ref.read(dioProvider);
-    Uri.parse('ws://localhost:8080/room/$gameRoomUuid/ws');
-
-    // connect to websocket and then set stream of websocket to state
-    final wsUrl = Uri.parse('ws://localhost:8080/room/$gameRoomUuid/ws');
-
-    final connection = await WebSocket.connect(wsUrl.toString());
-
-    final Stream<GameRoomMessage> gameRoomStream = connection.map((message) {
+    final Stream<GameRoomMessage?> gameRoomStream = stream.map((message) {
       try {
         if (message is String) {
-          GameRoomMessage.fromJson(jsonDecode(message) as Map<String, dynamic>);
+          return GameRoomMessage.fromJson(jsonDecode(message) as Map<String, dynamic>);
         } else {
-          logger.info('Recieved non-string message in socket: $message');
+          _logger.info('Recieved non-string message in socket: $message');
+          return null;
         }
       } catch (e) {
-        print('Error parsing message: $e');
-        rethrow;
+        _logger.severe('Error parsing message: `${message.runtimeType}` $message', e, StackTrace.current);
+        return null;
       }
     });
-    gameRoomStream.listen(
+    _sub = gameRoomStream.listen(
       (event) => state = AsyncValue.data(event),
     );
+    return gameRoomStream;
+  }
+
+  // Cancel the gameroom stream subscription
+  void close() {
+    _sub?.cancel();
   }
 }
diff --git a/frontend/lib/providers/web_socket.dart b/frontend/lib/providers/web_socket.dart
new file mode 100644
index 0000000..07db8b5
--- /dev/null
+++ b/frontend/lib/providers/web_socket.dart
@@ -0,0 +1,63 @@
+import 'dart:io';
+
+import 'package:flutter_riverpod/flutter_riverpod.dart';
+import 'package:frontend/providers/auth.dart';
+import 'package:logging/logging.dart';
+import 'package:riverpod_annotation/riverpod_annotation.dart';
+
+part 'web_socket.g.dart';
+
+final _logger = Logger('WebSocketNotifier');
+
+@riverpod
+class WebSocketNotifier extends _$WebSocketNotifier {
+  @override
+  Future<WebSocket?> build() async {
+    return null;
+  }
+
+  Future<void> connect() async {
+    state = AsyncValue.loading();
+    final jwt = ref.read(jwtNotifierProvider).valueOrNull;
+    final jwtBody = ref.read(jwtBodyProvider);
+    if (jwt == null || jwtBody == null) {
+      _logger.warning('Tried to connect to ws without jwt token');
+      return;
+    }
+
+    final wsUrl = Uri.parse('ws://localhost:8080/room/${jwtBody.roomCode}/ws');
+    _logger.finest('Attempting to connect to $wsUrl');
+
+    try {
+      final connection = await WebSocket.connect(wsUrl.toString(), headers: {'Authorization': 'Bearer: $jwt'});
+      _logger.fine('Client ws connection established to room ${jwtBody.roomUuid}');
+      state = AsyncValue.data(connection);
+    } catch (e) {
+      if (e is WebSocketException) {
+        if (e.httpStatusCode == 401) {
+          ref.read(jwtNotifierProvider.notifier).eraseJwt();
+        }
+      }
+      _logger.warning('Error occurred creating web socket: $e');
+    }
+  }
+}
+
+// @riverpod
+// class WebSocketStreamNotifier extends _$WebSocketStreamNotifier {
+//   @override
+//   Stream<dynamic> build() {
+//     final connection = ref.watch(webSocketNotifierProvider).valueOrNull;
+//     if (connection == null) return Stream.empty();
+//     _logger.finest('Created broadcast stream from ws connection');
+//     return connection.asBroadcastStream();
+//   }
+// }
+
+@riverpod
+Raw<Stream<dynamic>> webSocketStream(Ref ref) {
+  final connection = ref.watch(webSocketNotifierProvider).valueOrNull;
+  if (connection == null) return Stream.empty();
+  _logger.finest('Created broadcast stream from ws connection');
+  return connection.asBroadcastStream();
+}
diff --git a/shared_models/lib/jwt.dart b/shared_models/lib/jwt.dart
index 1f9291d..0f7ae05 100644
--- a/shared_models/lib/jwt.dart
+++ b/shared_models/lib/jwt.dart
@@ -6,10 +6,13 @@ part 'jwt.g.dart';
 class JWTBody {
   String uuid;
   String roomUuid;
+  String roomCode;
+  // Issued at in epoch seconds
   int iat;
+  // Expires at in epoch seconds
   int exp;
 
-  JWTBody({required this.uuid, required this.roomUuid, required this.iat, required this.exp});
+  JWTBody({required this.uuid, required this.roomUuid, required this.roomCode, required this.iat, required this.exp});
 
   factory JWTBody.fromJson(Map<String, dynamic> json) => _$JWTBodyFromJson(json);
 
diff --git a/shared_models/lib/room.dart b/shared_models/lib/room.dart
index aade6ae..b226a46 100644
--- a/shared_models/lib/room.dart
+++ b/shared_models/lib/room.dart
@@ -25,11 +25,14 @@ class CreateRoomResponse {
 }
 
 sealed class GameRoomMessage {
-  const GameRoomMessage();
+  GameRoomMessage(this.roomUuid);
+  final String roomUuid;
+  abstract final String type;
 
   factory GameRoomMessage.fromJson(Map<String, dynamic> json) {
     return switch (json['type']) {
       'ping' => PingMessage.fromJson(json),
+      'roomPing' => RoomPingMessage.fromJson(json),
       'playerVote' => PlayerVoteMessage.fromJson(json),
       _ => throw Exception('Unknown message type'),
     };
@@ -38,36 +41,61 @@ sealed class GameRoomMessage {
   Map<String, dynamic> toJson();
 }
 
+enum PingDestination { client, server }
+
+@JsonSerializable()
 class PingMessage extends GameRoomMessage {
-  final DateTime timestamp;
+  DateTime timestamp;
+  final PingDestination dest;
+  final String userUuid;
 
-  const PingMessage({required this.timestamp});
+  PingMessage(super.roomUuid, {required this.dest, required this.userUuid}) {
+    timestamp = DateTime.now();
+    type = 'ping';
+  }
 
-  factory PingMessage.fromJson(Map<String, dynamic> json) =>
-      PingMessage(timestamp: DateTime.parse(json['timestamp'] as String));
+  factory PingMessage.fromJson(Map<String, dynamic> json) => _$PingMessageFromJson(json);
 
   @override
-  Map<String, dynamic> toJson() => {
-        'type': 'ping',
-        'timestamp': timestamp.toIso8601String(),
-      };
+  Map<String, dynamic> toJson() => _$PingMessageToJson(this);
+
+  @override
+  late final String type;
 }
 
+@JsonSerializable()
+class RoomPingMessage extends GameRoomMessage {
+  late final DateTime timestamp;
+  final PingDestination dest;
+
+  RoomPingMessage(super.roomUuid, {required this.dest}) {
+    timestamp = DateTime.now();
+    type = 'roomPing';
+  }
+
+  factory RoomPingMessage.fromJson(Map<String, dynamic> json) => _$RoomPingMessageFromJson(json);
+
+  @override
+  Map<String, dynamic> toJson() => _$RoomPingMessageToJson(this);
+
+  @override
+  late final String type;
+}
+
+@JsonSerializable()
 class PlayerVoteMessage extends GameRoomMessage {
   final String playerUuid;
   final int vote;
 
-  const PlayerVoteMessage({required this.playerUuid, required this.vote});
+  PlayerVoteMessage(super.roomUuid, {required this.playerUuid, required this.vote}) {
+    type = 'playerVote';
+  }
 
-  factory PlayerVoteMessage.fromJson(Map<String, dynamic> json) => PlayerVoteMessage(
-        playerUuid: json['playerUuid'] as String,
-        vote: json['vote'] as int,
-      );
+  factory PlayerVoteMessage.fromJson(Map<String, dynamic> json) => _$PlayerVoteMessageFromJson(json);
 
   @override
-  Map<String, dynamic> toJson() => {
-        'type': 'playerVote',
-        'playerUuid': playerUuid,
-        'vote': vote,
-      };
+  Map<String, dynamic> toJson() => _$PlayerVoteMessageToJson(this);
+
+  @override
+  late final String type;
 }