Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions apps/meteor/app/lib/client/methods/sendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { onClientMessageReceived } from '../../../../client/lib/onClientMessageR
import { settings } from '../../../../client/lib/settings';
import { dispatchToastMessage } from '../../../../client/lib/toast';
import { getUser, getUserId } from '../../../../client/lib/user';
import { upsertThreadMessageInCache } from '../../../../client/lib/utils/threadMessageUtils';
import { Messages, Rooms } from '../../../../client/stores';
import { trim } from '../../../../lib/utils/stringUtils';
import { t } from '../../../utils/lib/i18n';
Expand Down Expand Up @@ -42,9 +43,14 @@ Meteor.methods<ServerMethods>({
return;
}

await onClientMessageReceived(message as IMessage).then((message) => {
Messages.state.store(message);
return clientCallbacks.run('afterSaveMessage', message, { room, user });
await onClientMessageReceived(message as IMessage).then((msg) => {
Messages.state.store(msg);

if (msg.tmid) {
upsertThreadMessageInCache(msg, msg.rid, msg.tmid);
}

return clientCallbacks.run('afterSaveMessage', msg, { room, user });
});
},
});
8 changes: 7 additions & 1 deletion apps/meteor/client/lib/chats/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Messages, Rooms, Subscriptions } from '../../stores';
import { settings } from '../settings';
import { getUserId } from '../user';
import { prependReplies } from '../utils/prependReplies';
import { upsertThreadMessageInCache } from '../utils/threadMessageUtils';

export const createDataAPI = ({ rid, tmid }: { rid: IRoom['_id']; tmid: IMessage['_id'] | undefined }): DataAPI => {
const composeMessage = async (
Expand Down Expand Up @@ -162,7 +163,12 @@ export const createDataAPI = ({ rid, tmid }: { rid: IRoom['_id']; tmid: IMessage
};

const pushEphemeralMessage = async (message: Omit<IMessage, 'rid' | 'tmid'>): Promise<void> => {
Messages.state.store({ ...message, rid, ...(tmid && { tmid }) });
const fullMessage = { ...message, rid, ...(tmid && { tmid }) } as IMessage;
Messages.state.store(fullMessage);

if (tmid) {
upsertThreadMessageInCache(fullMessage, rid, tmid);
}
};

const updateMessage = async (message: IEditedMessage, previewUrls?: string[]): Promise<void> => {
Expand Down
2 changes: 2 additions & 0 deletions apps/meteor/client/lib/queryKeys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export const roomsQueryKeys = {
images: (rid: IRoom['_id'], options?: { startingFromId?: string }) => [...roomsQueryKeys.room(rid), 'images', options] as const,
autocomplete: (text: string) => [...roomsQueryKeys.all, 'autocomplete', text] as const,
discussions: (rid: IRoom['_id'], ...args: [filter: { text?: string }]) => [...roomsQueryKeys.room(rid), 'discussions', ...args] as const,
threadMessages: (rid: IRoom['_id'], tmid: IMessage['_id']) => [...roomsQueryKeys.room(rid), 'threads', tmid, 'messages'] as const,
threadMainMessage: (rid: IRoom['_id'], tmid: IMessage['_id']) => [...roomsQueryKeys.room(rid), 'threads', tmid, 'main-message'] as const,
};

export const subscriptionsQueryKeys = {
Expand Down
69 changes: 69 additions & 0 deletions apps/meteor/client/lib/utils/threadMessageUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import type { IMessage, MessageAttachment } from '@rocket.chat/core-typings';
import { createPredicateFromFilter } from '@rocket.chat/mongo-adapter';
import type { QueryClient } from '@tanstack/react-query';
import type { Condition, Filter } from 'mongodb';

import { queryClient as defaultQueryClient } from '../queryClient';
import { roomsQueryKeys } from '../queryKeys';

export type NotifyRoomRidDeleteBulkEvent = {
rid: IMessage['rid'];
excludePinned: boolean;
ignoreDiscussion: boolean;
ts: Condition<Date>;
users: string[];
ids?: string[];
showDeletedStatus?: boolean;
} & (
| {
filesOnly: true;
replaceFileAttachmentsWith?: MessageAttachment;
}
| {
filesOnly?: false;
}
);

export const createDeleteCriteria = (params: NotifyRoomRidDeleteBulkEvent): ((message: IMessage) => boolean) => {
const query: Filter<IMessage> = {};

if (params.ids) {
query._id = { $in: params.ids };
} else {
query.ts = params.ts;
}

if (params.excludePinned) {
query.pinned = { $ne: true };
}

if (params.ignoreDiscussion) {
query.drid = { $exists: false };
}
if (params.users?.length) {
query['u.username'] = { $in: params.users };
}

return createPredicateFromFilter(query);
};

export const upsertThreadMessageInCache = (
message: IMessage,
rid: IMessage['rid'],
tmid: IMessage['_id'],
client: QueryClient = defaultQueryClient,
): void => {
const queryKey = roomsQueryKeys.threadMessages(rid, tmid);
client.setQueryData<IMessage[]>(queryKey, (old) => {
if (!old) {
return [message];
}
const idx = old.findIndex((m) => m._id === message._id);
if (idx >= 0) {
const updated = [...old];
updated[idx] = message;
return updated;
}
return [...old, message].sort((a, b) => new Date(a.ts).getTime() - new Date(b.ts).getTime());
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { useDateScroll } from '../../../hooks/useDateScroll';
import { useFirstUnreadMessageId } from '../../../hooks/useFirstUnreadMessageId';
import { useMessageListNavigation } from '../../../hooks/useMessageListNavigation';
import { useLegacyThreadMessageListScrolling } from '../hooks/useLegacyThreadMessageListScrolling';
import { useLegacyThreadMessages } from '../hooks/useLegacyThreadMessages';
import { useThreadMessagesQuery } from '../hooks/useThreadMessagesQuery';
import './threads.css';

const isMessageSequential = (current: IMessage, previous: IMessage | undefined, groupingRange: number): boolean => {
Expand Down Expand Up @@ -54,7 +54,7 @@ const ThreadMessageList = ({ mainMessage }: ThreadMessageListProps): ReactElemen
const { t } = useTranslation();
const { innerRef, bubbleRef, listStyle, ...bubbleDate } = useDateScroll();

const { messages, loading } = useLegacyThreadMessages(mainMessage._id);
const { data: messages = [], isLoading: loading } = useThreadMessagesQuery(mainMessage._id);

const { innerRef: listScrollRef, jumpToRef } = useLegacyThreadMessageListScrolling(mainMessage);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,60 +1,19 @@
import type { IMessage, IThreadMainMessage, MessageAttachment } from '@rocket.chat/core-typings';
import { createPredicateFromFilter } from '@rocket.chat/mongo-adapter';
import { useStream } from '@rocket.chat/ui-contexts';
import type { UseQueryResult } from '@tanstack/react-query';
import { useQueryClient, useQuery } from '@tanstack/react-query';
import type { Condition, Filter } from 'mongodb';
import { useCallback, useEffect, useRef } from 'react';

import { useGetMessageByID } from './useGetMessageByID';
import { withDebouncing } from '../../../../../../lib/utils/highOrderFunctions';
import { onClientMessageReceived } from '../../../../../lib/onClientMessageReceived';
import { roomsQueryKeys } from '../../../../../lib/queryKeys';
import { modifyMessageOnFilesDelete } from '../../../../../lib/utils/modifyMessageOnFilesDelete';
import { createDeleteCriteria } from '../../../../../lib/utils/threadMessageUtils';
import { useRoom } from '../../../contexts/RoomContext';

type RoomMessagesRidEvent = IMessage;

type NotifyRoomRidDeleteBulkEvent = {
rid: IMessage['rid'];
excludePinned: boolean;
ignoreDiscussion: boolean;
ts: Condition<Date>;
users: string[];
ids?: string[]; // message ids have priority over ts
showDeletedStatus?: boolean;
} & (
| {
filesOnly: true;
replaceFileAttachmentsWith?: MessageAttachment;
}
| {
filesOnly?: false;
}
);

const createDeleteCriteria = (params: NotifyRoomRidDeleteBulkEvent): ((message: IMessage) => boolean) => {
const query: Filter<IMessage> = {};

if (params.ids) {
query._id = { $in: params.ids };
} else {
query.ts = params.ts;
}

if (params.excludePinned) {
query.pinned = { $ne: true };
}

if (params.ignoreDiscussion) {
query.drid = { $exists: false };
}
if (params.users?.length) {
query['u.username'] = { $in: params.users };
}

return createPredicateFromFilter(query);
};

const useSubscribeToMessage = () => {
const subscribeToRoomMessages = useStream('room-messages');
const subscribeToNotifyRoom = useStream('notify-room');
Expand All @@ -67,9 +26,9 @@ const useSubscribeToMessage = () => {
onDelete,
onFilesDelete,
}: {
onMutate?: (message: IMessage) => void;
onDelete?: () => void;
onFilesDelete?: (replaceFileAttachmentsWith?: MessageAttachment) => void;
onMutate?: (message: IMessage) => void | Promise<void>;
onDelete?: () => void | Promise<void>;
onFilesDelete?: (replaceFileAttachmentsWith?: MessageAttachment) => void | Promise<void>;
},
) => {
const unsubscribeFromRoomMessages = subscribeToRoomMessages(message.rid, (event: RoomMessagesRidEvent) => {
Expand Down Expand Up @@ -120,7 +79,7 @@ export const useThreadMainMessageQuery = (
}, [tmid]);

return useQuery({
queryKey: ['rooms', room._id, 'threads', tmid, 'main-message'] as const,
queryKey: roomsQueryKeys.threadMainMessage(room._id, tmid),

queryFn: async ({ queryKey }) => {
const mainMessage = await getMessage(tmid);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { isThreadMessage, type IMessage, type IRoom, type IThreadMainMessage, type IThreadMessage } from '@rocket.chat/core-typings';
import { useMethod, useStream } from '@rocket.chat/ui-contexts';
import { useQuery, useQueryClient } from '@tanstack/react-query';
import { useEffect } from 'react';

import { onClientMessageReceived } from '../../../../../lib/onClientMessageReceived';
import { roomsQueryKeys } from '../../../../../lib/queryKeys';
import { modifyMessageOnFilesDelete } from '../../../../../lib/utils/modifyMessageOnFilesDelete';
import { createDeleteCriteria, upsertThreadMessageInCache } from '../../../../../lib/utils/threadMessageUtils';
import { useRoom } from '../../../contexts/RoomContext';

const processMessages = async (messages: IMessage[]): Promise<IMessage[]> => {
return Promise.all(messages.map((msg) => onClientMessageReceived(msg)));
};

export const useThreadMessagesQuery = (tmid: IThreadMainMessage['_id'], rid?: IRoom['_id']) => {
const room = useRoom();
const roomId = rid ?? room._id;

const queryClient = useQueryClient();
const queryKey = roomsQueryKeys.threadMessages(roomId, tmid);
const getThreadMessages = useMethod('getThreadMessages');

const subscribeToRoomMessages = useStream('room-messages');
const subscribeToNotifyRoom = useStream('notify-room');

useEffect(() => {
const currentQueryKey = roomsQueryKeys.threadMessages(roomId, tmid);

const unsubscribeFromRoomMessages = subscribeToRoomMessages(roomId, async (event) => {
if (event.tmid !== tmid) {
return;
}

const processed = await onClientMessageReceived(event);
upsertThreadMessageInCache(processed, roomId, tmid, queryClient);
});

const unsubscribeFromDeleteMessage = subscribeToNotifyRoom(`${roomId}/deleteMessage`, (event) => {
queryClient.setQueryData<IThreadMessage[]>(currentQueryKey, (old) => {
if (!old) {
return old;
}
return old.filter((m) => m._id !== event._id);
});
});

const unsubscribeFromDeleteMessageBulk = subscribeToNotifyRoom(`${roomId}/deleteMessageBulk`, (bulkParams) => {
const matchDeleteCriteria = createDeleteCriteria(bulkParams);

queryClient.setQueryData<IThreadMessage[]>(currentQueryKey, (old) => {
if (!old) {
return old;
}

if (bulkParams.filesOnly) {
return old.map((msg) => {
if (matchDeleteCriteria(msg)) {
return modifyMessageOnFilesDelete(msg, bulkParams.replaceFileAttachmentsWith);
}
return msg;
});
}

return old.filter((msg) => !matchDeleteCriteria(msg));
});
});

return () => {
unsubscribeFromRoomMessages();
unsubscribeFromDeleteMessage();
unsubscribeFromDeleteMessageBulk();
};
}, [tmid, roomId, queryClient, subscribeToRoomMessages, subscribeToNotifyRoom]);
Comment on lines +27 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand we already listen to these streams for the main channel message list (here).

Since we already listen to the streams, could we move this routing logic there? (I mean deciding whether or not it is a thread message and adding it to cache).

I believe the other upsert can stay as optimistic update (e.g sendMessage.ts)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @aleksandernsilva, thanks for the review!

I looked into this but I think keeping the subscription in useThreadMessagesQuery is the better approach here. A few reasons:

LegacyRoomManager writes into the zustand Messages store which is deprecated. The whole point of this PR is to move thread messages into a react-query cache instead. Routing through LegacyRoomManager would mean coupling react-query cache updates to legacy code we're trying to move away from.

useThreadMainMessageQuery already follows this same pattern. It has its own stream subscription for the main thread message. This just mirrors that for thread replies, so it's consistent.

About the duplicate subscription concern - I dug into SDKClient.ts and the SDK multiplexes stream callbacks behind a single DDP subscription per stream-name/eventKey. So subscribing to room-messages for the same room from two places doesn't create an extra network connection. It just adds another callback to the client-side emitter. The DDP sub is only torn down when all listeners are gone.

The hook subscription is lifecycle-scoped to when the thread panel is open, which feels right. LegacyRoomManager runs for the entire room session.

The upsertThreadMessageInCache call in sendMessage.ts stays as an optimistic update as you suggested. That part makes sense to keep there.


return useQuery({
queryKey,
queryFn: async () => {
const messages = await getThreadMessages({ tmid });
const filtered = messages.filter(
(msg): msg is IThreadMessage => isThreadMessage(msg) && msg.tmid === tmid && msg._id !== tmid && msg._hidden !== true,
);
const sorted = filtered.sort((a, b) => a.ts.getTime() - b.ts.getTime());
return processMessages(sorted) as Promise<Array<IThreadMessage>>;
},
});
};
Loading
Loading