diff --git a/.env.mysql b/.env.mysql new file mode 100644 index 000000000..2d26597e9 --- /dev/null +++ b/.env.mysql @@ -0,0 +1,70 @@ +SERVER_NAME=evolution +SERVER_TYPE=http +SERVER_PORT=8080 +SERVER_URL=http://localhost:8081 + +SENTRY_DSN= + +TELEMETRY_ENABLED=false +LOG_LEVEL=ERROR,WARN,DEBUG,INFO,LOG,VERBOSE,WEBHOOKS +LOG_COLOR=true +LOG_BAILEYS=error + +EVENT_EMITTER_MAX_LISTENERS=50 +DEL_INSTANCE=false + +DATABASE_PROVIDER=mysql +DATABASE_CONNECTION_URI=mysql://evolution:evolution_password@mysql-db:3306/evolution +DATABASE_CONNECTION_CLIENT_NAME=evolution_exchange +DATABASE_SAVE_DATA_INSTANCE=true +DATABASE_SAVE_DATA_NEW_MESSAGE=true +DATABASE_SAVE_MESSAGE_UPDATE=true +DATABASE_SAVE_DATA_CONTACTS=true +DATABASE_SAVE_DATA_CHATS=true +DATABASE_SAVE_DATA_LABELS=true +DATABASE_SAVE_DATA_HISTORIC=true +DATABASE_SAVE_IS_ON_WHATSAPP=true +DATABASE_SAVE_IS_ON_WHATSAPP_DAYS=7 +DATABASE_DELETE_MESSAGE=true + +RABBITMQ_ENABLED=false +SQS_ENABLED=false +WEBSOCKET_ENABLED=false +PUSHER_ENABLED=false +KAFKA_ENABLED=false + +CACHE_REDIS_ENABLED=true +CACHE_REDIS_URI=redis://redis:6379/6 +CACHE_REDIS_TTL=604800 +CACHE_REDIS_PREFIX_KEY=evolution +CACHE_REDIS_SAVE_INSTANCES=false +CACHE_LOCAL_ENABLED=false + +S3_ENABLED=false + +CONFIG_SESSION_PHONE_CLIENT=Evolution API +CONFIG_SESSION_PHONE_NAME=Chrome + +QRCODE_LIMIT=30 +QRCODE_COLOR='#175197' + +TYPEBOT_ENABLED=false +CHATWOOT_ENABLED=false +OPENAI_ENABLED=false +DIFY_ENABLED=false +N8N_ENABLED=false +EVOAI_ENABLED=false + +AUTHENTICATION_API_KEY=429683C4C977415CAAFCCE10F7D57E11 +AUTHENTICATION_EXPOSE_IN_FETCH_INSTANCES=true + +CORS_ORIGIN=* +CORS_METHODS=GET,POST,PUT,DELETE +CORS_CREDENTIALS=true + +LANGUAGE=en + +MYSQL_DATABASE=evolution +MYSQL_USERNAME=evolution +MYSQL_PASSWORD=evolution_password +MYSQL_ROOT_PASSWORD=root_password diff --git a/.env.postgres b/.env.postgres new file mode 100644 index 000000000..58d1fdb2b --- /dev/null +++ b/.env.postgres @@ -0,0 +1,69 @@ +SERVER_NAME=evolution +SERVER_TYPE=http +SERVER_PORT=8080 +SERVER_URL=http://localhost:8083 + +SENTRY_DSN= + +TELEMETRY_ENABLED=false +LOG_LEVEL=ERROR,WARN,DEBUG,INFO,LOG,VERBOSE,WEBHOOKS +LOG_COLOR=true +LOG_BAILEYS=error + +EVENT_EMITTER_MAX_LISTENERS=50 +DEL_INSTANCE=false + +DATABASE_PROVIDER=postgresql +DATABASE_CONNECTION_URI=postgresql://postgres:postgres@postgres-db:5432/evolution_db?schema=evolution_api +DATABASE_CONNECTION_CLIENT_NAME=evolution_exchange +DATABASE_SAVE_DATA_INSTANCE=true +DATABASE_SAVE_DATA_NEW_MESSAGE=true +DATABASE_SAVE_MESSAGE_UPDATE=true +DATABASE_SAVE_DATA_CONTACTS=true +DATABASE_SAVE_DATA_CHATS=true +DATABASE_SAVE_DATA_LABELS=true +DATABASE_SAVE_DATA_HISTORIC=true +DATABASE_SAVE_IS_ON_WHATSAPP=true +DATABASE_SAVE_IS_ON_WHATSAPP_DAYS=7 +DATABASE_DELETE_MESSAGE=true + +RABBITMQ_ENABLED=false +SQS_ENABLED=false +WEBSOCKET_ENABLED=false +PUSHER_ENABLED=false +KAFKA_ENABLED=false + +CACHE_REDIS_ENABLED=true +CACHE_REDIS_URI=redis://redis:6379/6 +CACHE_REDIS_TTL=604800 +CACHE_REDIS_PREFIX_KEY=evolution +CACHE_REDIS_SAVE_INSTANCES=false +CACHE_LOCAL_ENABLED=false + +S3_ENABLED=false + +CONFIG_SESSION_PHONE_CLIENT=Evolution API +CONFIG_SESSION_PHONE_NAME=Chrome + +QRCODE_LIMIT=30 +QRCODE_COLOR='#175197' + +TYPEBOT_ENABLED=false +CHATWOOT_ENABLED=false +OPENAI_ENABLED=false +DIFY_ENABLED=false +N8N_ENABLED=false +EVOAI_ENABLED=false + +AUTHENTICATION_API_KEY=429683C4C977415CAAFCCE10F7D57E11 +AUTHENTICATION_EXPOSE_IN_FETCH_INSTANCES=true + +CORS_ORIGIN=* +CORS_METHODS=GET,POST,PUT,DELETE +CORS_CREDENTIALS=true + +LANGUAGE=en + +POSTGRES_DATABASE=evolution +POSTGRES_USERNAME=evolution +POSTGRES_PASSWORD=evolution_password diff --git a/docker-compose.mysql.yaml b/docker-compose.mysql.yaml new file mode 100644 index 000000000..1c390af1a --- /dev/null +++ b/docker-compose.mysql.yaml @@ -0,0 +1,88 @@ +version: "3.8" + +services: + api-mysql: + container_name: evolution_api_mysql + build: + context: . + dockerfile: Dockerfile + restart: always + depends_on: + mysql-db: + condition: service_healthy + redis: + condition: service_started + ports: + - "127.0.0.1:8081:8080" + volumes: + - evolution_instances_mysql:/evolution/instances + networks: + - evolution-net + env_file: + - .env.mysql + environment: + - DATABASE_PROVIDER=mysql + - DATABASE_CONNECTION_URI=mysql://evolution:evolution_password@mysql-db:3306/evolution + - CACHE_REDIS_URI=redis://redis:6379/6 + expose: + - "8080" + + frontend: + container_name: evolution_frontend + image: evoapicloud/evolution-manager:latest + restart: always + ports: + - "3000:80" + networks: + - evolution-net + + redis: + container_name: evolution_redis + image: redis:latest + restart: always + command: > + redis-server --port 6379 --appendonly yes + volumes: + - evolution_redis:/data + networks: + evolution-net: + aliases: + - evolution-redis + expose: + - "6379" + + mysql-db: + container_name: mysql_evolution_db + image: mysql:8.0 + restart: always + environment: + MYSQL_DATABASE: ${MYSQL_DATABASE:-evolution} + MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-root_password} + MYSQL_USER: ${MYSQL_USERNAME:-evolution} + MYSQL_PASSWORD: ${MYSQL_PASSWORD:-evolution_password} + command: + - --default-authentication-plugin=mysql_native_password + - --character-set-server=utf8mb4 + - --collation-server=utf8mb4_unicode_ci + - --max_connections=1000 + volumes: + - mysql_data:/var/lib/mysql + networks: + - evolution-net + expose: + - "3306" + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + evolution_instances_mysql: + evolution_redis: + mysql_data: + +networks: + evolution-net: + name: evolution-net + driver: bridge diff --git a/docker-compose.postgres.yaml b/docker-compose.postgres.yaml new file mode 100644 index 000000000..f5722c57b --- /dev/null +++ b/docker-compose.postgres.yaml @@ -0,0 +1,88 @@ +version: "3.8" + +services: + api-postgres: + container_name: evolution_api_postgres + build: + context: . + dockerfile: Dockerfile + restart: always + depends_on: + postgres-db: + condition: service_healthy + redis: + condition: service_started + ports: + - "127.0.0.1:8083:8080" + volumes: + - evolution_instances_postgres:/evolution/instances + networks: + - evolution-net + env_file: + - .env.postgres + environment: + - DATABASE_PROVIDER=postgresql + - DATABASE_CONNECTION_URI=postgresql://postgres:postgres@postgres-db:5432/evolution_db?schema=evolution_api + - CACHE_REDIS_URI=redis://redis:6379/6 + expose: + - "8080" + + frontend: + container_name: evolution_frontend_postgres + image: evoapicloud/evolution-manager:latest + restart: always + ports: + - "3001:80" + networks: + - evolution-net + + redis: + container_name: evolution_redis_postgres + image: redis:latest + restart: always + command: > + redis-server --port 6379 --appendonly yes + volumes: + - evolution_redis_postgres:/data + networks: + evolution-net: + aliases: + - evolution-redis + expose: + - "6379" + + postgres-db: + container_name: postgres_evolution_db + image: postgres:15 + restart: always + environment: + POSTGRES_DB: ${POSTGRES_DATABASE:-evolution} + POSTGRES_USER: ${POSTGRES_USERNAME:-evolution} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-evolution_password} + command: + - postgres + - -c + - max_connections=1000 + - -c + - listen_addresses=* + volumes: + - postgres_data:/var/lib/postgresql/data + networks: + - evolution-net + expose: + - "5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U evolution"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + evolution_instances_postgres: + evolution_redis_postgres: + postgres_data: + +networks: + evolution-net: + name: evolution-net + driver: bridge diff --git a/docker-compose.yaml b/docker-compose.yaml index e0edee656..299303dda 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,7 +14,6 @@ services: - evolution_instances:/evolution/instances networks: - evolution-net - - dokploy-network env_file: - .env expose: @@ -41,9 +40,6 @@ services: evolution-net: aliases: - evolution-redis - dokploy-network: - aliases: - - evolution-redis expose: - "6379" @@ -67,7 +63,6 @@ services: - postgres_data:/var/lib/postgresql/data networks: - evolution-net - - dokploy-network expose: - "5432" @@ -79,6 +74,4 @@ volumes: networks: evolution-net: name: evolution-net - driver: bridge - dokploy-network: - external: true \ No newline at end of file + driver: bridge \ No newline at end of file diff --git a/prisma/mysql-migrations/20250918183912_re_add_lid_to_is_onwhatsapp/migration.sql b/prisma/mysql-migrations/20250918183912_re_add_lid_to_is_onwhatsapp/migration.sql new file mode 100644 index 000000000..32d48b38f --- /dev/null +++ b/prisma/mysql-migrations/20250918183912_re_add_lid_to_is_onwhatsapp/migration.sql @@ -0,0 +1,2 @@ +-- Re-add lid column to IsOnWhatsapp +ALTER TABLE `IsOnWhatsapp` ADD COLUMN `lid` VARCHAR(100); diff --git a/prisma/mysql-schema.prisma b/prisma/mysql-schema.prisma index 71b5a743f..46f597011 100644 --- a/prisma/mysql-schema.prisma +++ b/prisma/mysql-schema.prisma @@ -249,6 +249,8 @@ model Label { updatedAt DateTime @updatedAt @db.Timestamp Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) instanceId String + + @@unique([labelId, instanceId]) } model Proxy { @@ -655,6 +657,7 @@ model IsOnWhatsapp { id String @id @default(cuid()) remoteJid String @unique @db.VarChar(100) jidOptions String + lid String? @db.VarChar(100) createdAt DateTime @default(dbgenerated("CURRENT_TIMESTAMP")) @db.Timestamp updatedAt DateTime @updatedAt @db.Timestamp } diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index 60e857fcc..8360f54b2 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -522,32 +522,62 @@ export class BaileysStartupService extends ChannelStartupService { private async getMessage(key: proto.IMessageKey, full = false) { try { - // Use raw SQL to avoid JSON path issues - const webMessageInfo = (await this.prismaRepository.$queryRaw` - SELECT * FROM "Message" - WHERE "instanceId" = ${this.instanceId} - AND "key"->>'id' = ${key.id} - `) as proto.IWebMessageInfo[]; - - if (full) { - return webMessageInfo[0]; - } - if (webMessageInfo[0].message?.pollCreationMessage) { - const messageSecretBase64 = webMessageInfo[0].message?.messageContextInfo?.messageSecret; + // Fetch messages in batches to find the one with matching key.id + // Using pagination instead of arbitrary limit to ensure we find the message + const pageSize = 100; + let pageNumber = 0; + const maxPages = 100; // Maximum 10,000 messages to search + + while (pageNumber < maxPages) { + const messages = await this.prismaRepository.message.findMany({ + where: { + instanceId: this.instanceId, + }, + skip: pageNumber * pageSize, + take: pageSize, + orderBy: { createdAt: 'desc' }, // Most recent first + }); - if (typeof messageSecretBase64 === 'string') { - const messageSecret = Buffer.from(messageSecretBase64, 'base64'); + if (messages.length === 0) { + break; // No more messages + } - const msg = { - messageContextInfo: { messageSecret }, - pollCreationMessage: webMessageInfo[0].message?.pollCreationMessage, - }; + // Filter by key.id (handle both string and object keys) + const webMessageInfo = messages.find((m) => { + try { + const msgKey = typeof m.key === 'string' ? JSON.parse(m.key) : m.key; + return msgKey?.id === key.id; + } catch { + return false; + } + }); + + if (webMessageInfo) { + if (full) { + return webMessageInfo; + } + + const msg = webMessageInfo.message; + if (msg && typeof msg === 'object' && 'pollCreationMessage' in msg) { + const messageSecretBase64 = (msg as any).messageContextInfo?.messageSecret; + + if (typeof messageSecretBase64 === 'string') { + const messageSecret = Buffer.from(messageSecretBase64, 'base64'); + + return { + messageContextInfo: { messageSecret }, + pollCreationMessage: (msg as any).pollCreationMessage, + }; + } + } return msg; } + + pageNumber++; } - return webMessageInfo[0].message; + return { conversation: '' }; } catch { return { conversation: '' }; } @@ -646,7 +676,10 @@ export class BaileysStartupService extends ChannelStartupService { }, msgRetryCounterCache: this.msgRetryCounterCache, generateHighQualityLinkPreview: true, - getMessage: async (key) => (await this.getMessage(key)) as Promise, + getMessage: async (key) => { + const msg = await this.getMessage(key); + return msg as unknown as proto.IMessage; + }, ...browserOptions, markOnlineOnConnect: this.localSettings.alwaysOnline, retryRequestDelayMs: 350, @@ -1637,13 +1670,40 @@ export class BaileysStartupService extends ChannelStartupService { const searchId = originalMessageId || key.id; - const messages = (await this.prismaRepository.$queryRaw` - SELECT * FROM "Message" - WHERE "instanceId" = ${this.instanceId} - AND "key"->>'id' = ${searchId} - LIMIT 1 - `) as any[]; - findMessage = messages[0] || null; + // Find message using pagination to ensure we search all messages + // Replaces arbitrary limit of 100 with proper batched search + const pageSize = 100; + let pageNumber = 0; + const maxPages = 100; // Maximum 10,000 messages to search + + while (pageNumber < maxPages && !findMessage) { + const messages = await this.prismaRepository.message.findMany({ + where: { instanceId: this.instanceId }, + skip: pageNumber * pageSize, + take: pageSize, + orderBy: { createdAt: 'desc' }, // Most recent first + }); + + if (messages.length === 0) { + break; // No more messages + } + + const targetMsg = messages.find((m: any) => { + try { + const msgKey = typeof m.key === 'string' ? JSON.parse(m.key) : m.key; + return msgKey?.id === searchId; + } catch { + return false; + } + }); + + if (targetMsg) { + findMessage = targetMsg; + break; + } + + pageNumber++; + } if (!findMessage?.id) { this.logger.warn(`Original message not found for update. Skipping. Key: ${JSON.stringify(key)}`); @@ -2345,7 +2405,7 @@ export class BaileysStartupService extends ChannelStartupService { if (options?.quoted) { const m = options?.quoted; - const msg = m?.message ? m : ((await this.getMessage(m.key, true)) as WAMessage); + const msg = m?.message ? m : (await this.getMessage(m.key, true)) as unknown as WAMessage; if (msg) { quoted = msg; @@ -4734,40 +4794,67 @@ export class BaileysStartupService extends ChannelStartupService { private async updateMessagesReadedByTimestamp(remoteJid: string, timestamp?: number): Promise { if (timestamp === undefined || timestamp === null) return 0; - // Use raw SQL to avoid JSON path issues - const result = await this.prismaRepository.$executeRaw` - UPDATE "Message" - SET "status" = ${status[4]} - WHERE "instanceId" = ${this.instanceId} - AND "key"->>'remoteJid' = ${remoteJid} - AND ("key"->>'fromMe')::boolean = false - AND "messageTimestamp" <= ${timestamp} - AND ("status" IS NULL OR "status" = ${status[3]}) - `; - - if (result) { - if (result > 0) { - this.updateChatUnreadMessages(remoteJid); + // Fetch messages and filter by key.remoteJid and fromMe in application + const messages = await this.prismaRepository.message.findMany({ + where: { + instanceId: this.instanceId, + messageTimestamp: { + lte: timestamp, + }, + status: { + in: [status[3], null], + }, + }, + }); + + // Filter by remoteJid and fromMe in application + const messagesToUpdate = messages.filter((m) => { + try { + const msgKey = typeof m.key === 'string' ? JSON.parse(m.key) : m.key; + return msgKey?.remoteJid === remoteJid && msgKey?.fromMe === false; + } catch { + return false; } + }); - return result; + // Update all matching messages + const result = await Promise.all( + messagesToUpdate.map((m) => + this.prismaRepository.message.update({ + where: { id: m.id }, + data: { status: status[4] }, + }) + ) + ); + + if (result && result.length > 0) { + await this.updateChatUnreadMessages(remoteJid); + return result.length; } return 0; } private async updateChatUnreadMessages(remoteJid: string): Promise { - const [chat, unreadMessages] = await Promise.all([ - this.prismaRepository.chat.findFirst({ where: { remoteJid } }), - // Use raw SQL to avoid JSON path issues - this.prismaRepository.$queryRaw` - SELECT COUNT(*)::int as count FROM "Message" - WHERE "instanceId" = ${this.instanceId} - AND "key"->>'remoteJid' = ${remoteJid} - AND ("key"->>'fromMe')::boolean = false - AND "status" = ${status[3]} - `.then((result: any[]) => result[0]?.count || 0), - ]); + const chat = await this.prismaRepository.chat.findFirst({ where: { remoteJid } }); + + // Get unread messages count by filtering in application + const messages = await this.prismaRepository.message.findMany({ + where: { + instanceId: this.instanceId, + status: status[3], + }, + }); + + // Count unread messages for this remoteJid (fromMe = false) + const unreadMessages = messages.filter((m) => { + try { + const msgKey = typeof m.key === 'string' ? JSON.parse(m.key) : m.key; + return msgKey?.remoteJid === remoteJid && msgKey?.fromMe === false; + } catch { + return false; + } + }).length; if (chat && chat.unreadMessages !== unreadMessages) { await this.prismaRepository.chat.update({ where: { id: chat.id }, data: { unreadMessages } }); @@ -4777,51 +4864,73 @@ export class BaileysStartupService extends ChannelStartupService { } private async addLabel(labelId: string, instanceId: string, chatId: string) { - const id = cuid(); - - await this.prismaRepository.$executeRawUnsafe( - `INSERT INTO "Chat" ("id", "instanceId", "remoteJid", "labels", "createdAt", "updatedAt") - VALUES ($4, $2, $3, to_jsonb(ARRAY[$1]::text[]), NOW(), NOW()) ON CONFLICT ("instanceId", "remoteJid") - DO - UPDATE - SET "labels" = ( - SELECT to_jsonb(array_agg(DISTINCT elem)) - FROM ( - SELECT jsonb_array_elements_text("Chat"."labels") AS elem - UNION - SELECT $1::text AS elem - ) sub - ), - "updatedAt" = NOW();`, - labelId, - instanceId, - chatId, - id, - ); + try { + // Get existing chat with labels + const chat = await this.prismaRepository.chat.findFirst({ + where: { id: chatId, instanceId }, + }); + + if (!chat) { + return; + } + + // Parse existing labels + let labels: string[] = []; + if (chat.labels) { + try { + labels = typeof chat.labels === 'string' ? JSON.parse(chat.labels) : (chat.labels as unknown as string[]); + } catch { + labels = []; + } + } + + // Add labelId if not already present + if (!labels.includes(labelId)) { + labels.push(labelId); + } + + // Update chat with new labels + await this.prismaRepository.chat.update({ + where: { id: chatId }, + data: { labels: JSON.stringify(labels), updatedAt: new Date() }, + }); + } catch (error) { + this.logger.error(`Error adding label: ${error}`); + } } private async removeLabel(labelId: string, instanceId: string, chatId: string) { - const id = cuid(); - - await this.prismaRepository.$executeRawUnsafe( - `INSERT INTO "Chat" ("id", "instanceId", "remoteJid", "labels", "createdAt", "updatedAt") - VALUES ($4, $2, $3, '[]'::jsonb, NOW(), NOW()) ON CONFLICT ("instanceId", "remoteJid") - DO - UPDATE - SET "labels" = COALESCE ( - ( - SELECT jsonb_agg(elem) - FROM jsonb_array_elements_text("Chat"."labels") AS elem - WHERE elem <> $1 - ), - '[]'::jsonb - ), - "updatedAt" = NOW();`, - labelId, - instanceId, - chatId, - id, - ); + try { + // Get existing chat with labels + const chat = await this.prismaRepository.chat.findFirst({ + where: { id: chatId, instanceId }, + }); + + if (!chat) { + return; + } + + // Parse existing labels + let labels: string[] = []; + if (chat.labels) { + try { + labels = typeof chat.labels === 'string' ? JSON.parse(chat.labels) : (chat.labels as unknown as string[]); + } catch { + labels = []; + } + } + + // Remove labelId + labels = labels.filter((l) => l !== labelId); + + // Update chat with new labels + await this.prismaRepository.chat.update({ + where: { id: chatId }, + data: { labels: JSON.stringify(labels), updatedAt: new Date() }, + }); + } catch (error) { + this.logger.error(`Error removing label: ${error}`); + } } public async baileysOnWhatsapp(jid: string) { diff --git a/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts b/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts index 906fff188..b341c3af0 100644 --- a/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts +++ b/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts @@ -1617,18 +1617,59 @@ export class ChatwootService { return; } - // Use raw SQL to avoid JSON path issues - const result = await this.prismaRepository.$executeRaw` - UPDATE "Message" - SET - "chatwootMessageId" = ${chatwootMessageIds.messageId}, - "chatwootConversationId" = ${chatwootMessageIds.conversationId}, - "chatwootInboxId" = ${chatwootMessageIds.inboxId}, - "chatwootContactInboxSourceId" = ${chatwootMessageIds.contactInboxSourceId}, - "chatwootIsRead" = ${chatwootMessageIds.isRead || false} - WHERE "instanceId" = ${instance.instanceId} - AND "key"->>'id' = ${key.id} - `; + // Find and update messages using pagination to ensure we find the correct message + // instead of relying on arbitrary take: 100 limit + const pageSize = 100; + let pageNumber = 0; + const maxPages = 100; // Maximum 10,000 messages to search + let result = 0; + + while (pageNumber < maxPages) { + const messages = await this.prismaRepository.message.findMany({ + where: { + instanceId: instance.instanceId, + }, + skip: pageNumber * pageSize, + take: pageSize, + orderBy: { createdAt: 'desc' }, // Most recent first + }); + + if (messages.length === 0) { + break; // No more messages + } + + // Filter by key.id + const targetMessages = messages.filter((m) => { + try { + const msgKey = typeof m.key === 'string' ? JSON.parse(m.key) : m.key; + return msgKey?.id === key.id; + } catch { + return false; + } + }); + + // Update all matching messages in this batch + for (const msg of targetMessages) { + await this.prismaRepository.message.update({ + where: { id: msg.id }, + data: { + chatwootMessageId: chatwootMessageIds.messageId, + chatwootConversationId: chatwootMessageIds.conversationId, + chatwootInboxId: chatwootMessageIds.inboxId, + chatwootContactInboxSourceId: chatwootMessageIds.contactInboxSourceId, + chatwootIsRead: chatwootMessageIds.isRead || false, + }, + }); + result++; + } + + // If we found matches, we can stop searching + if (targetMessages.length > 0) { + break; + } + + pageNumber++; + } this.logger.verbose(`Update result: ${result} rows affected`); @@ -1642,15 +1683,25 @@ export class ChatwootService { } private async getMessageByKeyId(instance: InstanceDto, keyId: string): Promise { - // Use raw SQL query to avoid JSON path issues with Prisma - const messages = await this.prismaRepository.$queryRaw` - SELECT * FROM "Message" - WHERE "instanceId" = ${instance.instanceId} - AND "key"->>'id' = ${keyId} - LIMIT 1 - `; - - return (messages as MessageModel[])[0] || null; + // Get messages and filter by key.id in application (compatible with both MySQL and PostgreSQL) + const messages = await this.prismaRepository.message.findMany({ + where: { + instanceId: instance.instanceId, + }, + take: 100, // Limit to avoid performance issues + }); + + // Filter by key.id + const targetMessage = messages.find((m) => { + try { + const msgKey = typeof m.key === 'string' ? JSON.parse(m.key) : m.key; + return msgKey?.id === keyId; + } catch { + return false; + } + }); + + return (targetMessage as MessageModel) || null; } private async getReplyToIds( diff --git a/src/api/services/channel.service.ts b/src/api/services/channel.service.ts index 56bec0802..7b63cd26a 100644 --- a/src/api/services/channel.service.ts +++ b/src/api/services/channel.service.ts @@ -723,110 +723,82 @@ export class ChannelStartupService { : createJid(query.where?.remoteJid) : null; - const where = { - instanceId: this.instanceId, - }; + const timestampGte = query?.where?.messageTimestamp?.gte + ? Math.floor(new Date(query.where.messageTimestamp.gte).getTime() / 1000) + : null; + const timestampLte = query?.where?.messageTimestamp?.lte + ? Math.floor(new Date(query.where.messageTimestamp.lte).getTime() / 1000) + : null; - if (remoteJid) { - where['remoteJid'] = remoteJid; - } + // Get chats with latest message + const chats = await this.prismaRepository.chat.findMany({ + where: { + instanceId: this.instanceId, + ...(remoteJid && { remoteJid }), + }, + orderBy: { updatedAt: 'desc' }, + skip: query?.skip || 0, + take: query?.take || 20, + }); - const timestampFilter = - query?.where?.messageTimestamp?.gte && query?.where?.messageTimestamp?.lte - ? Prisma.sql` - AND "Message"."messageTimestamp" >= ${Math.floor(new Date(query.where.messageTimestamp.gte).getTime() / 1000)} - AND "Message"."messageTimestamp" <= ${Math.floor(new Date(query.where.messageTimestamp.lte).getTime() / 1000)}` - : Prisma.sql``; - - const limit = query?.take ? Prisma.sql`LIMIT ${query.take}` : Prisma.sql``; - const offset = query?.skip ? Prisma.sql`OFFSET ${query.skip}` : Prisma.sql``; - - const results = await this.prismaRepository.$queryRaw` - WITH rankedMessages AS ( - SELECT DISTINCT ON ("Message"."key"->>'remoteJid') - "Contact"."id" as "contactId", - "Message"."key"->>'remoteJid' as "remoteJid", - CASE - WHEN "Message"."key"->>'remoteJid' LIKE '%@g.us' THEN COALESCE("Chat"."name", "Contact"."pushName") - ELSE COALESCE("Contact"."pushName", "Message"."pushName") - END as "pushName", - "Contact"."profilePicUrl", - COALESCE( - to_timestamp("Message"."messageTimestamp"::double precision), - "Contact"."updatedAt" - ) as "updatedAt", - "Chat"."name" as "pushName", - "Chat"."createdAt" as "windowStart", - "Chat"."createdAt" + INTERVAL '24 hours' as "windowExpires", - "Chat"."unreadMessages" as "unreadMessages", - CASE WHEN "Chat"."createdAt" + INTERVAL '24 hours' > NOW() THEN true ELSE false END as "windowActive", - "Message"."id" AS "lastMessageId", - "Message"."key" AS "lastMessage_key", - CASE - WHEN "Message"."key"->>'fromMe' = 'true' THEN 'Você' - ELSE "Message"."pushName" - END AS "lastMessagePushName", - "Message"."participant" AS "lastMessageParticipant", - "Message"."messageType" AS "lastMessageMessageType", - "Message"."message" AS "lastMessageMessage", - "Message"."contextInfo" AS "lastMessageContextInfo", - "Message"."source" AS "lastMessageSource", - "Message"."messageTimestamp" AS "lastMessageMessageTimestamp", - "Message"."instanceId" AS "lastMessageInstanceId", - "Message"."sessionId" AS "lastMessageSessionId", - "Message"."status" AS "lastMessageStatus" - FROM "Message" - LEFT JOIN "Contact" ON "Contact"."remoteJid" = "Message"."key"->>'remoteJid' AND "Contact"."instanceId" = "Message"."instanceId" - LEFT JOIN "Chat" ON "Chat"."remoteJid" = "Message"."key"->>'remoteJid' AND "Chat"."instanceId" = "Message"."instanceId" - WHERE "Message"."instanceId" = ${this.instanceId} - ${remoteJid ? Prisma.sql`AND "Message"."key"->>'remoteJid' = ${remoteJid}` : Prisma.sql``} - ${timestampFilter} - ORDER BY "Message"."key"->>'remoteJid', "Message"."messageTimestamp" DESC - ) - SELECT * FROM rankedMessages - ORDER BY "updatedAt" DESC NULLS LAST - ${limit} - ${offset}; - `; - - if (results && isArray(results) && results.length > 0) { - const mappedResults = results.map((contact) => { - const lastMessage = contact.lastMessageId - ? { - id: contact.lastMessageId, - key: contact.lastMessage_key, - pushName: contact.lastMessagePushName, - participant: contact.lastMessageParticipant, - messageType: contact.lastMessageMessageType, - message: contact.lastMessageMessage, - contextInfo: contact.lastMessageContextInfo, - source: contact.lastMessageSource, - messageTimestamp: contact.lastMessageMessageTimestamp, - instanceId: contact.lastMessageInstanceId, - sessionId: contact.lastMessageSessionId, - status: contact.lastMessageStatus, - } - : undefined; - - return { - id: contact.contactId || null, - remoteJid: contact.remoteJid, - pushName: contact.pushName, - profilePicUrl: contact.profilePicUrl, - updatedAt: contact.updatedAt, - windowStart: contact.windowStart, - windowExpires: contact.windowExpires, - windowActive: contact.windowActive, - lastMessage: lastMessage ? this.cleanMessageData(lastMessage) : undefined, - unreadCount: contact.unreadMessages, - isSaved: !!contact.contactId, - }; - }); + // Get all messages for these chats to find the latest + const messages = await this.prismaRepository.message.findMany({ + where: { + instanceId: this.instanceId, + ...(timestampGte && timestampLte && { + messageTimestamp: { + gte: timestampGte, + lte: timestampLte, + }, + }), + }, + orderBy: { messageTimestamp: 'desc' }, + }); - return mappedResults; + // Group messages by remoteJid for O(1) lookup instead of O(n) per chat + // This prevents O(#chats × #messages) complexity + const messagesByRemoteJid = new Map(); + for (const msg of messages) { + try { + const msgKey = typeof msg.key === 'string' ? JSON.parse(msg.key) : msg.key; + const remoteJid = msgKey?.remoteJid; + if (remoteJid) { + if (!messagesByRemoteJid.has(remoteJid)) { + messagesByRemoteJid.set(remoteJid, []); + } + messagesByRemoteJid.get(remoteJid)!.push(msg); + } + } catch { + // Skip messages with invalid keys + } } - return []; + // Map results to expected format + const mappedResults = chats.map((chat) => { + // Get latest message for this chat from pre-grouped map + const chatMessages = messagesByRemoteJid.get(chat.remoteJid); + const lastMessage = chatMessages?.[0]; // Already sorted by timestamp desc + + const now = new Date(); + const windowExpires = chat.createdAt ? new Date(chat.createdAt.getTime() + 24 * 60 * 60 * 1000) : now; + const windowActive = windowExpires > now; + + return { + id: chat.id, + remoteJid: chat.remoteJid, + pushName: chat.name || null, + profilePicUrl: null, + updatedAt: chat.updatedAt || new Date(), + windowStart: chat.createdAt, + windowExpires, + windowActive, + lastMessage: lastMessage ? this.cleanMessageData(lastMessage) : undefined, + unreadCount: chat.unreadMessages || 0, + isSaved: false, + }; + }); + + return mappedResults; } public hasValidMediaContent(message: any): boolean { diff --git a/src/utils/json-query.helper.ts b/src/utils/json-query.helper.ts new file mode 100644 index 000000000..8bab95368 --- /dev/null +++ b/src/utils/json-query.helper.ts @@ -0,0 +1,176 @@ +/** + * Helper utilities for JSON queries compatible with both MySQL and PostgreSQL + * Handles JSON extraction and filtering in a provider-agnostic way + */ + +export class JsonQueryHelper { + /** + * Extract a value from a JSON field + * Works with both string and object JSON values + * + * @param jsonField - The JSON field (can be string or object) + * @param path - The property path (e.g., 'id', 'remoteJid') + * @returns The extracted value or undefined + */ + static extractValue(jsonField: any, path: string): any { + if (!jsonField) { + return undefined; + } + + try { + // Handle string JSON + if (typeof jsonField === 'string') { + const parsed = JSON.parse(jsonField); + return parsed[path]; + } + + // Handle object JSON + if (typeof jsonField === 'object') { + return jsonField[path]; + } + + return undefined; + } catch { + return undefined; + } + } + + /** + * Get nested value from JSON using dot notation path + * + * @param jsonField - The JSON field (can be string or object) + * @param path - The dot notation path (e.g., 'contextInfo.stanzaId') + * @returns The extracted value or undefined + */ + static extractNestedValue(jsonField: any, path: string): any { + if (!jsonField) { + return undefined; + } + + try { + let obj = typeof jsonField === 'string' ? JSON.parse(jsonField) : jsonField; + + const parts = path.split('.'); + for (const part of parts) { + if (obj == null) { + return undefined; + } + obj = obj[part]; + } + + return obj; + } catch { + return undefined; + } + } + + /** + * Convert JSON array to actual array + * + * @param jsonField - The JSON field (can be string or array) + * @returns Parsed array or empty array + */ + static toArray(jsonField: any): any[] { + if (!jsonField) { + return []; + } + + try { + if (typeof jsonField === 'string') { + const parsed = JSON.parse(jsonField); + return Array.isArray(parsed) ? parsed : []; + } + + return Array.isArray(jsonField) ? jsonField : []; + } catch { + return []; + } + } + + /** + * Convert value to JSON string for storage + * + * @param value - The value to serialize + * @returns JSON string + */ + static stringify(value: any): string { + try { + return JSON.stringify(value); + } catch { + return '{}'; + } + } + + /** + * Filter array of objects by JSON field value + * + * @param items - Array of items with JSON fields + * @param jsonFieldName - Name of the JSON field + * @param path - Property path in JSON (e.g., 'id', 'remoteJid') + * @param value - Value to match + * @returns Filtered array + */ + static filterByJsonValue>( + items: T[], + jsonFieldName: keyof T, + path: string, + value: any + ): T[] { + return items.filter((item) => { + const jsonField = item[jsonFieldName]; + const extractedValue = this.extractValue(jsonField, path); + return extractedValue === value; + }); + } + + /** + * Find first item by JSON field value + * + * @param items - Array of items with JSON fields + * @param jsonFieldName - Name of the JSON field + * @param path - Property path in JSON + * @param value - Value to match + * @returns First matching item or undefined + */ + static findByJsonValue>( + items: T[], + jsonFieldName: keyof T, + path: string, + value: any + ): T | undefined { + return items.find((item) => { + const jsonField = item[jsonFieldName]; + const extractedValue = this.extractValue(jsonField, path); + return extractedValue === value; + }); + } + + /** + * Group items by JSON field value + * + * @param items - Array of items + * @param jsonFieldName - Name of the JSON field + * @param path - Property path in JSON + * @returns Map of grouped items + */ + static groupByJsonValue>( + items: T[], + jsonFieldName: keyof T, + path: string + ): Map { + const grouped = new Map(); + + for (const item of items) { + const jsonField = item[jsonFieldName]; + const key = this.extractValue(jsonField, path); + + if (!grouped.has(key)) { + grouped.set(key, []); + } + + grouped.get(key)!.push(item); + } + + return grouped; + } +}