消息分表的目的应该包含2方面:性能+可维护性,这里只谈性能。
曾有幸参与过大数据项目,前期使用mysql存储,当单表达到2000W条数据后,查询变慢,达到5000W条时(还有十几个表同样大,甚至有单表达到1亿条的)有几十个连接并发查询,mysql就经常崩溃死锁。
但是如何分表呢?
消息分表面临以下2个挑战:
- 消息路由。必须确保同一个会话的聊天消息存储在1个表中,否则如何查询聊天历史并还原?
- 聊天历史。客户端拉取聊天历史,事先并不知道有多少条消息,那些时间有。所以需要服务端查询到所有时间的所有消息,并按照时间逆序返回。
接下来分析几个开源项目的方案,以供我们参考。
引入RelatedShip概念,A(from_id=2)->B(to_id=4)发消息时,计算smallID和bigID,插入到关系表中,根据数据库返回的自增长ID % 8来路由,后续所有A->B,B->A的消息,都去计算samllID和bigID,查询releatedID % 8来路由消息。
- 好处:简单,不需要跨表查询。消息分布相对均匀,因为releated是自增长的,具有顺序性
- 坏处:坏处,每有一条消息存储,数据库查询次数增多,效率变低。可以通过引入缓存解决,问题不大。
关键代码:
if (nFromId != nToId) {
nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, nSessionType, nMsgItem, false,
nDomainId);
if (INVALID_VALUE == nSessionId) {
nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, nSessionType, nMsgItem,
nDomainId);
}// End if (INVALID_VALUE==)
nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, nSessionType, nMsgItem,
false, nDomainId);
if (INVALID_VALUE == nPeerSessionId) {
nPeerSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, nSessionType, nMsgItem,
nDomainId);
} // End if(INVALID_VALUE)
uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, nMsgItem, true);
if (nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE) {
msgIdGotLock.lock();
nMsgId = pMsgModel->getMsgId(nRelateId);
uint32_t curTime = (uint32_t) time(0);
nMsgId += curTime;
msgIdGotLock.unlock();
if (nMsgId != INVALID_VALUE) {
pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nMsgItem, nCreateTime, nMsgId,
strMsgData, nDomainId);
CSessionModel::getInstance()->updateSession(nSessionId, nNow);
CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);
} else { // End if(nMsgId != INVALID_VALUE)
WARN("msgId is invalid. fromId=%llu, appId=%u, domainId=%u, toId=%llu, nRelateId=%u, nSessionId=%u, nMsgType=%u, nMsgItem=%u",
nFromId, nAppId, nDomainId, nToId, nRelateId, nSessionId, nMsgType, nMsgItem);
}// End else
} else {//End if(nSessionId != INVALID_VALUE&&)
WARN("sessionId or relateId is invalid. fromId=%llu, appId=%u, domainId=%u, toId=%llu, nRelateId=%u, nSessionId=%u, nMsgType=%u, nMsgItem=%u",
nFromId, nAppId, nDomainId, nToId, nRelateId, nSessionId, nMsgType, nMsgItem);
} // End else
} else {//End if (nFromId != nToId)
WARN("send msg to self. fromId=%llu, appId=%u, domainId=%u, toId=%llu, msgType=%u, nMsgItem=%u", nFromId,
nAppId, nDomainId, nToId, nMsgType, nMsgItem);
} // End else
uint32_t CRelationModel::getRelationId(uint64_t nUserAId, uint64_t nUserBId, uint64_t nMsgItem, bool bAdd) {
uint32_t nRelationId = INVALID_VALUE;
if (nUserAId == 0 || nUserBId == 0) {
WARN("invalied user id:%llu->%llu", nUserAId, nUserBId);
return nRelationId;
}
CDBManager *pDBManager = CDBManager::getInstance();
CDBConn *pDBConn = pDBManager->GetDBConn("teamtalk_slave");
if (pDBConn) {
uint64_t nBigId = nUserAId > nUserBId ? nUserAId : nUserBId;
uint64_t nSmallId = nUserAId > nUserBId ? nUserBId : nUserAId;
string strSql = "select id from IMRelationShip where smallId=" + int2string(nSmallId) + " and bigId=" +
int2string(nBigId) + " and msgItem=" + int2string(nMsgItem) + " and status = 0";
CResultSet *pResultSet = pDBConn->ExecuteQuery(strSql.c_str());
if (pResultSet) {
while (pResultSet->Next()) {
nRelationId = pResultSet->GetInt("id");
}
delete pResultSet;
} else {
WARN("there is no result for sql:%s", strSql.c_str());
}
pDBManager->RelDBConn(pDBConn);
if (nRelationId == INVALID_VALUE && bAdd) {
nRelationId = addRelation(nSmallId, nBigId, nMsgItem);
}
} else {
ERROR("no db connection for teamtalk_slave");
}
return nRelationId;
}查询历史消息的时候,也非常的简单,只需要根据from_id和to_id计算smallID和bigID后,得到releatedID,从消息表中查询,再加上limit和oder by即可。
关键代码:
void CMessageModel::getMessage(uint64_t nUserId, uint64_t nPeerId, uint64_t nMsgItem, uint32_t nMsgId,
uint32_t nMsgCnt, list<IM::BaseDefine::MsgInfo> &lsMsg, uint32_t nDomainId) {
uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nUserId, nPeerId, nMsgItem, false);
if (nRelateId != INVALID_VALUE) {
CDBManager *pDBManager = CDBManager::getInstance();
CDBConn *pDBConn = pDBManager->GetDBConn("teamtalk_slave");
if (pDBConn) {
string strTableName = "IMMessage_" + int2string(nRelateId % 8);
string strSql;
if (nMsgId == 0) {
strSql = "select * from " + strTableName
+ " force index (idx_relateId_status_created) where relateId= " + int2string(nRelateId)
+ " and domainId=" + int2string(nDomainId)
+ " and status = 0 order by created desc, id desc limit " + int2string(nMsgCnt);
} else {
strSql = "select * from " + strTableName
+ " force index (idx_relateId_status_created) where relateId= " + int2string(nRelateId)
+ " and domainId=" + int2string(nDomainId)
+ " and status = 0 and msgId <=" + int2string(nMsgId)
+ " order by created desc, id desc limit " + int2string(nMsgCnt);
}
CResultSet *pResultSet = pDBConn->ExecuteQuery(strSql.c_str());
if (pResultSet) {
while (pResultSet->Next()) {
IM::BaseDefine::MsgInfo cMsg;
cMsg.set_msg_id(pResultSet->GetInt("msgId"));
cMsg.set_from_session_id(pResultSet->GetLongLong("fromId"));
cMsg.set_create_time(pResultSet->GetInt("created"));
cMsg.set_msg_item(pResultSet->GetLongLong("msgItem"));
uint32_t nMsgType = pResultSet->GetInt("type");
if (IM::BaseDefine::MsgType_IsValid(nMsgType)) {
cMsg.set_msg_data(pResultSet->GetString("content"));
cMsg.set_msg_type((IM::BaseDefine::MsgType) nMsgType);
lsMsg.push_back(cMsg);
} else if (IM::BaseDefine::ExtMsgType_IsValid(nMsgType)) {
cMsg.set_msg_data(OLD_VERSION_NOTICE);
cMsg.set_msg_type(IM::BaseDefine::MSG_TYPE_CONSULT_TEXT);
cMsg.mutable_ext_info()->set_msg_data(pResultSet->GetString("content"));
cMsg.mutable_ext_info()->set_msg_type(nMsgType);
lsMsg.push_back(cMsg);
} else {
WARN("invalid msgType. userId=%llu, peerId=%llu, msgId=%u, msgCnt=%u, msgType=%u", nUserId,
nPeerId, nMsgId, nMsgCnt, nMsgType);
}
}
delete pResultSet;
} else {
log("no result set: %s", strSql.c_str());
}
pDBManager->RelDBConn(pDBConn);
if (!lsMsg.empty()) {
CAudioModel::getInstance()->readAudios(lsMsg);
}
} else {
ERROR("no db connection for teamtalk_slave");
}
} else {
WARN("no relation between %llu and %llu", nUserId, nPeerId);
}
} 来源:https://mp.weixin.qq.com/s/TYUNPgf_3rkBr38rNlEZ2g

瓜子采用扩散写的方式存储im消息,所以由send和receive组成,关于扩散读还是扩散写,引用 https://github.com/alberliu/goim 的解释:
- 首先解释一下,什么是读扩散,什么是写扩散
- 读扩散:当两个客户端A,B发消息时,首先建立一个会话,A,B发所发的每条消息都会写到这个会话里,消息同步的时候,只需要查询这个会话即可。群发消息也是一样,群组内部发送消息时,也是先建立一个会话,都将这个消息写入这个会话中,触发消息同步时,只需要查询这个会话就行,读扩散的好处就是,每个消息只需要写入数据库一次就行,但是,每个会话都要维持一个消息序列,作消息同步,客户端要上传N个(用户会话个数)序列号,服务器要为这N个序列号做消息同步,想想一下,成百上千个序列号,是不是有点恐怖。
- 写扩散:就是每个用户维持一个消息列表,当有其他用户给这个用户发送消息时,给这个用户的消息列表插入一条消息即可,这样做的好处就是,每个用户只需要维护一个消息列表,也只需要维护一组序列号,坏处就是,一个群组有多少人,就要插入多少条消息,DB的压力会增大。 如何选型呢,我采用的时写扩散,据说,微信3000人的群也是这样干的。当然也有一些IM,对大群做了特殊处理,对超大群采用读扩散的模式。
这样所有发送的消息 fromId % 4和所有收到的消息 toId % 4都始终路由到1个表中了。只是还原聊天场景的时候,稍微麻烦一点,需要联合查询receive和send表。
注:咨询作者后有些出入,见以下
- 问:请问消息漫游功能,还原历史聊天场景的时候,是需要跨receive和send表查询的吧?想知道您关于“类似QQ下拉刷新,每次返回20条消息”给客户端MySQL查询的思路。感觉好像和拉离线消息的流程实际有些出入唉,这里不是很明白,感谢
- 答:这个点我们后来跟文章里写的稍微有点不同。对于单聊消息,我们只检索双方的发送消息表(可能不在一个分库),每个表都查出两人之间的20条记录,然后用拉链法聚合,返回最近20条;群聊由于是按照会话ID存储的,所以直接查会话的消息就好。
- 感谢作者:封宇 公众号:普通程序员
不过缺点也比较明显,消息会分布不均匀,活跃用户和非活跃用户导致。
引用 http://docs.wildfirechat.cn/faq/server.html
0.0.3. Q. 消息存储在数据库中的那张表中
A. 使用h2db存储消息时,消息存储在t_message表中;使用mysql存储消息时,消息存储在t_message_X表中,X等于 月份-1+(年份%3)12*,例如2019年4月存储在t_message_3表中(4-1+(2019%3)*12)。
感觉这种方案好维护,性能也比较不错。查询历史记录时逻辑相对简单,据Wildfirechat介绍,专业版支持百万级别用户,可能这种设计比较适合这种用户量级的场景。
不过缺点也比较明显,消息会分布不均匀(比如某月用户活跃,该表就较大)
亿级别的好像也是这种方案,见文章:https://blog.csdn.net/weixin_42018518/article/details/90897824
如果考虑选择扩散读,则建议使用Wildfirechat方案,按月份分片。如果选择扩散写,则建议使用瓜子im方案,分接收和发送表,按id mod存。
因为之前对Teamtalk比较熟悉,想尝试一下扩散写的方案,所以CoffeeChat最终使用瓜子IM的方案。