From 16dd3f08c1e5396d5f9ff3f13417901bc4e4b8b9 Mon Sep 17 00:00:00 2001
From: Emelia Smith <ThisIsMissEm@users.noreply.github.com>
Date: Fri, 9 Jun 2023 19:29:16 +0200
Subject: [PATCH] Fix performance of streaming by parsing message JSON once
 (#25278)

---
 streaming/index.js | 60 +++++++++++++++++++++++++++++-----------------
 1 file changed, 38 insertions(+), 22 deletions(-)

diff --git a/streaming/index.js b/streaming/index.js
index 279ebbad83..4b2607ed92 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -52,18 +52,31 @@ const redisUrlToClient = async (defaultConfig, redisUrl) => {
 };
 
 /**
+ * Attempts to safely parse a string as JSON, used when both receiving a message
+ * from redis and when receiving a message from a client over a websocket
+ * connection, this is why it accepts a `req` argument.
  * @param {string} json
- * @param {any} req
+ * @param {any?} req
  * @returns {Object.<string, any>|null}
  */
 const parseJSON = (json, req) => {
   try {
     return JSON.parse(json);
   } catch (err) {
-    if (req.accountId) {
-      log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
+    /* FIXME: This logging isn't great, and should probably be done at the
+     * call-site of parseJSON, not in the method, but this would require changing
+     * the signature of parseJSON to return something akin to a Result type:
+     * [Error|null, null|Object<string,any}], and then handling the error
+     * scenarios.
+     */
+    if (req) {
+      if (req.accountId) {
+        log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
+      } else {
+        log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
+      }
     } else {
-      log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
+      log.warn(`Error parsing message from redis: ${err}`);
     }
     return null;
   }
@@ -163,7 +176,7 @@ const startServer = async () => {
   const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env);
 
   /**
-   * @type {Object.<string, Array.<function(string): void>>}
+   * @type {Object.<string, Array.<function(Object<string, any>): void>>}
    */
   const subs = {};
 
@@ -203,7 +216,10 @@ const startServer = async () => {
       return;
     }
 
-    callbacks.forEach(callback => callback(message));
+    const json = parseJSON(message, null);
+    if (!json) return;
+
+    callbacks.forEach(callback => callback(json));
   };
 
   /**
@@ -225,7 +241,7 @@ const startServer = async () => {
 
   /**
    * @param {string} channel
-   * @param {function(string): void} callback
+   * @param {function(Object<string, any>): void} callback
    */
   const unsubscribe = (channel, callback) => {
     log.silly(`Removing listener for ${channel}`);
@@ -369,7 +385,7 @@ const startServer = async () => {
 
   /**
    * @param {any} req
-   * @returns {string}
+   * @returns {string|undefined}
    */
   const channelNameFromPath = req => {
     const { path, query } = req;
@@ -478,15 +494,11 @@ const startServer = async () => {
   /**
    * @param {any} req
    * @param {SystemMessageHandlers} eventHandlers
-   * @returns {function(string): void}
+   * @returns {function(object): void}
    */
   const createSystemMessageListener = (req, eventHandlers) => {
     return message => {
-      const json = parseJSON(message, req);
-
-      if (!json) return;
-
-      const { event } = json;
+      const { event } = message;
 
       log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
 
@@ -603,19 +615,16 @@ const startServer = async () => {
    * @param {function(string, string): void} output
    * @param {function(string[], function(string): void): void} attachCloseHandler
    * @param {boolean=} needsFiltering
-   * @returns {function(string): void}
+   * @returns {function(object): void}
    */
   const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
     const accountId = req.accountId || req.remoteAddress;
 
     log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
 
+    // Currently message is of type string, soon it'll be Record<string, any>
     const listener = message => {
-      const json = parseJSON(message, req);
-
-      if (!json) return;
-
-      const { event, payload, queued_at } = json;
+      const { event, payload, queued_at } = message;
 
       const transmit = () => {
         const now = new Date().getTime();
@@ -1198,8 +1207,15 @@ const startServer = async () => {
     ws.on('close', onEnd);
     ws.on('error', onEnd);
 
-    ws.on('message', data => {
-      const json = parseJSON(data, session.request);
+    ws.on('message', (data, isBinary) => {
+      if (isBinary) {
+        log.debug('Received binary data, closing connection');
+        ws.close(1003, 'The mastodon streaming server does not support binary messages');
+        return;
+      }
+      const message = data.toString('utf8');
+
+      const json = parseJSON(message, session.request);
 
       if (!json) return;
 
-- 
GitLab