看了一下内核代码,似乎linux 的sysv msgq 没有 原子操作必须 < 4k的限制,
只是在内核存储的时候,是以每块4076的切割后存储。
DATALEN_MSG == 4096 – sizeof(struct msg_msg) == 4076
asmlinkage long sys_msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)
{
struct msg_queue *msq;
struct msg_msg *msg;
long mtype;
int err;
if (msgsz > msg_ctlmax || (long) msgsz < 0 || msqid < 0) return -EINVAL; if (get_user(mtype, &msgp->mtype))
return -EFAULT;
if (mtype < 1) return -EINVAL;
{
struct msg_queue *msq;
struct msg_msg *msg;
long mtype;
int err;
if (msgsz > msg_ctlmax || (long) msgsz < 0 || msqid < 0) return -EINVAL; if (get_user(mtype, &msgp->mtype))
return -EFAULT;
if (mtype < 1) return -EINVAL;
// 从用户空间拷贝数据到内核,同时把数据切割为DATALEN_MSG(4096 – sizeof(struct msg_msg))大小
// 每块内存用msg_msg格式组成链表形式
// 在分块拷贝时可能会被其他调用中断
msg = load_msg(msgp->mtext, msgsz);
if(IS_ERR(msg))
return PTR_ERR(msg);
if(IS_ERR(msg))
return PTR_ERR(msg);
msg->m_type = mtype;
msg->m_ts = msgsz;
msg->m_ts = msgsz;
// 对这个msg queue进行加锁(非整个系统的sysv queue,是一个比较细的锁)
// 同时从ipc array中获取相应的msgq的指针
msq = msg_lock(msqid);
err=-EINVAL;
if(msq==NULL)
goto out_free;
retry:
err= -EIDRM;
if (msg_checkid(msq,msqid))
goto out_unlock_free;
err=-EINVAL;
if(msq==NULL)
goto out_free;
retry:
err= -EIDRM;
if (msg_checkid(msq,msqid))
goto out_unlock_free;
err=-EACCES;
if (ipcperms(&msq->q_perm, S_IWUGO))
goto out_unlock_free;
if (ipcperms(&msq->q_perm, S_IWUGO))
goto out_unlock_free;
//如果新的msg size大于系统最大值,msg count大于系统最大值(比较奇怪q_qbytes 的定义可能没有很明确,所以最大消息数和msg size都用了
//同样的定义
if(msgsz + msq->q_cbytes > msq->q_qbytes ||
1 + msq->q_qnum > msq->q_qbytes) {
struct msg_sender s;
1 + msq->q_qnum > msq->q_qbytes) {
struct msg_sender s;
if(msgflg&IPC_NOWAIT) {
err=-EAGAIN;
goto out_unlock_free;
}
err=-EAGAIN;
goto out_unlock_free;
}
//把自己加入到等待队列中
ss_add(msq, &s);
ss_add(msq, &s);
//解锁
msg_unlock(msqid);
msg_unlock(msqid);
//切换进程
schedule();
schedule();
//被切换回来
current->state= TASK_RUNNING;
current->state= TASK_RUNNING;
//加锁
msq = msg_lock(msqid);
err = -EIDRM;
if(msq==NULL)
goto out_free;
err = -EIDRM;
if(msq==NULL)
goto out_free;
//把睡眠队列删了
ss_del(&s);
ss_del(&s);
if (signal_pending(current)) {
err=-EINTR;
goto out_unlock_free;
}
goto retry;
}
msq->q_lspid = current->pid;
msq->q_stime = CURRENT_TIME;
msq->q_stime = CURRENT_TIME;
//如果有睡眠中的接收进程,则直接唤醒接收进程,然后把消息投递给该进程
//如果没有,则加入到该msgq的链表中
if(!pipelined_send(msq,msg)) {
/* noone is waiting for this message, enqueue it */
list_add_tail(&msg->m_list,&msq->q_messages);
msq->q_cbytes += msgsz;
msq->q_qnum++;
atomic_add(msgsz,&msg_bytes);
atomic_inc(&msg_hdrs);
}
err = 0;
msg = NULL;
/* noone is waiting for this message, enqueue it */
list_add_tail(&msg->m_list,&msq->q_messages);
msq->q_cbytes += msgsz;
msq->q_qnum++;
atomic_add(msgsz,&msg_bytes);
atomic_inc(&msg_hdrs);
}
err = 0;
msg = NULL;
out_unlock_free:
//解锁
msg_unlock(msqid);
out_free:
if(msg!=NULL)
free_msg(msg);
return err;
}
msg_unlock(msqid);
out_free:
if(msg!=NULL)
free_msg(msg);
return err;
}