`
yanwenjinhaha
  • 浏览: 15249 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

android vold架构详解(2)_Kernel层向上发送消息处理流程

 
阅读更多
学习罗老师,先上一张Kernel层向上发送消息处理流程的序列图,下面一点一点分析。



Step.17以前都在上一篇分析文章里,不在说明了。从Step.17开始分析。
Step.17 在main方法中,nm->start()方法里,开启Socket,监听Kernel层向上发送的消息
int NetlinkManager::start() {
    struct sockaddr_nl nladdr;
    int sz = 64 * 1024;
    int on = 1;

    memset(&nladdr, 0, sizeof(nladdr));
    nladdr.nl_family = AF_NETLINK;
    nladdr.nl_pid = getpid();
    nladdr.nl_groups = 0xffffffff;

    // 创建协议族为PF_NETLINK,类型为SOCK_DGRAM的socket,返回该socket套接字的文件描述符
    if ((mSock = socket(PF_NETLINK,
                        SOCK_DGRAM,NETLINK_KOBJECT_UEVENT)) < 0) {
        SLOGE("Unable to create uevent socket: %s", strerror(errno));
        return -1;
    }

    if (setsockopt(mSock, SOL_SOCKET, SO_RCVBUFFORCE, &sz, sizeof(sz)) < 0) {
        SLOGE("Unable to set uevent socket SO_RECBUFFORCE option: %s", strerror(errno));
        return -1;
    }

    if (setsockopt(mSock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on)) < 0) {
        SLOGE("Unable to set uevent socket SO_PASSCRED option: %s", strerror(errno));
        return -1;
    }

    if (bind(mSock, (struct sockaddr *) &nladdr, sizeof(nladdr)) < 0) {
        SLOGE("Unable to bind uevent socket: %s", strerror(errno));
        return -1;
    }
	
    mHandler = new NetlinkHandler(mSock);
    if (mHandler->start()) {
        SLOGE("Unable to start NetlinkHandler: %s", strerror(errno));
        return -1;
    }
    return 0;
}


Step.18~26 NetlinkHandler的继承关系,NetlinkHandler→NetlinkListener→SocketListener。依次创建NetlinkHandler、NetlinkListener、SocketListener的实例
NetlinkHandler的构造函数
NetlinkHandler::NetlinkHandler(int listenerSocket) :
                NetlinkListener(listenerSocket) {
}


NetlinkListener的构造函数
NetlinkListener::NetlinkListener(int socket) :
                            SocketListener(socket, false) {
    mFormat = NETLINK_FORMAT_ASCII;
}


SocketListener的构造函数
SocketListener::SocketListener(int socketFd, bool listen) {
    mListen = listen;
    mSocketName = NULL;
    mSock = socketFd;
    pthread_mutex_init(&mClientsLock, NULL);
    mClients = new SocketClientCollection();
}

最后实例化了一个SocketClientCollection。

Step.27 接着看看start()方法都干了什么。
NetlinkHandler.start
int NetlinkHandler::start() {
	// 调用父类startListener()方法
    return this->startListener();
}


Step.28 SocketListener的startListener()方法
int SocketListener::startListener() {
	
    if (!mSocketName && mSock == -1) {
        SLOGE("Failed to start unbound listener");
        errno = EINVAL;
        return -1;
    } else if (mSocketName) {
        if ((mSock = android_get_control_socket(mSocketName)) < 0) {
            SLOGE("Obtaining file descriptor socket '%s' failed: %s",
                 mSocketName, strerror(errno));
            return -1;
        }
    }
    // 将mSock套接字(NetlinkManager::start()中创建的)变为被连接套接口,
    // 使得一个进程可以接受其它进程的请求,从而成为一个服务器进程
    if (mListen && listen(mSock, 4) < 0) {
        SLOGE("Unable to listen on socket (%s)", strerror(errno));
        return -1;
    } else if (!mListen)
    	// 创建SocketClient实例,并添加到前面构造函数中实例化的SocketClientCollection中
        mClients->push_back(new SocketClient(mSock, false));

    // 建立管道,得到管道的读取端和写入端
    if (pipe(mCtrlPipe)) {
        SLOGE("pipe failed (%s)", strerror(errno));
        return -1;
    }

    // 启动线程,指定线程的开始执行的函数threadStart,并进入该方法,继续分析
    if (pthread_create(&mThread, NULL, SocketListener::threadStart, this)) {
        SLOGE("pthread_create (%s)", strerror(errno));
        return -1;
    }

    return 0;
}


Step.32 SocketListener的threadStart方法
void *SocketListener::threadStart(void *obj) {
    SocketListener *me = reinterpret_cast<SocketListener *>(obj);
    // 继续调用自己的runListener()方法,
    // 马上进入最最重要的部分
    me->runListener();
    pthread_exit(NULL);
    return NULL;
}



Strp.33 runListener()方法
void SocketListener::runListener() {

	// 创建一个SocketClientCollection,用来保存待处理的SocketClient,其实就是接受到Kernel层消息的SocketClient。
    SocketClientCollection *pendingList = new SocketClientCollection();

    // 开始while循环,时刻监听kernel的消息
    while(1) {
        SocketClientCollection::iterator iterator;
        fd_set read_fds;
        int rc = 0;
        int max = -1;

        // 一连串对套接字的更新,重置,检查等处理
        FD_ZERO(&read_fds);

        if (mListen) {
            max = mSock;
            FD_SET(mSock, &read_fds);
        }

        FD_SET(mCtrlPipe[0], &read_fds);
        if (mCtrlPipe[0] > max)
            max = mCtrlPipe[0];

        pthread_mutex_lock(&mClientsLock);
        for (it = mClients->begin(); it != mClients->end(); ++it) {
            int fd = (*it)->getSocket();
            FD_SET(fd, &read_fds);
            if (fd > max)
                max = fd;
        }
        pthread_mutex_unlock(&mClientsLock);

        if ((rc = select(max + 1, &read_fds, NULL, NULL, NULL)) < 0) {
            if (errno == EINTR)
                continue;
            SLOGE("select failed (%s)", strerror(errno));
            sleep(1);
            continue;
        } else if (!rc)
            continue;

        if (FD_ISSET(mCtrlPipe[0], &read_fds))
            break;
        if (mListen && FD_ISSET(mSock, &read_fds)) {
            struct sockaddr addr;
            socklen_t alen;
            int c;

            do {
                alen = sizeof(addr);
                c = accept(mSock, &addr, &alen);
            } while (c < 0 && errno == EINTR);
            if (c < 0) {
                SLOGE("accept failed (%s)", strerror(errno));
                sleep(1);
                continue;
            }
            pthread_mutex_lock(&mClientsLock);
            mClients->push_back(new SocketClient(c, true));
            pthread_mutex_unlock(&mClientsLock);
        }

        /* Add all active clients to the pending list first */
        pendingList->clear();
        pthread_mutex_lock(&mClientsLock);
        for (it = mClients->begin(); it != mClients->end(); ++it) {
            int fd = (*it)->getSocket();
            if (FD_ISSET(fd, &read_fds)) {
            	// 如果有消息,将当前SocketClient添加到pendingList中
                pendingList->push_back(*it);
            }
        }
        pthread_mutex_unlock(&mClientsLock);

        /* Process the pending list, since it is owned by the thread,
         * there is no need to lock it */
        // 遍历pendingList,判断是否不为空。
        // 不为空表示,有kernel层发送过来的消息需要处理
        // 调用onDataAvailable处理消息
        while (!pendingList->empty()) {
            /* Pop the first item from the list */
            it = pendingList->begin();
            SocketClient* c = *it;
            pendingList->erase(it);
            /* Process it, if false is returned and our sockets are
             * connection-based, remove and destroy it */
            if (!onDataAvailable(c) && mListen) {
                /* Remove the client from our array */
                pthread_mutex_lock(&mClientsLock);
                for (it = mClients->begin(); it != mClients->end(); ++it) {
                    if (*it == c) {
                        mClients->erase(it);
                        break;
                    }
                }
                pthread_mutex_unlock(&mClientsLock);
                /* Remove our reference to the client */
                c->decRef();
            }
        }
    }
    delete pendingList;
}


Step.34 取到Kernel的消息后,接下来处理该消息
NetlinkListener的onDataAvailable方法
bool NetlinkListener::onDataAvailable(SocketClient *cli)
{
    int socket = cli->getSocket();
    ssize_t count;

    count = TEMP_FAILURE_RETRY(uevent_kernel_multicast_recv(socket, mBuffer, sizeof(mBuffer)));
    if (count < 0) {
        SLOGE("recvmsg failed (%s)", strerror(errno));
        return false;
    }

    NetlinkEvent *evt = new NetlinkEvent();
    if (!evt->decode(mBuffer, count, mFormat)) {
        SLOGE("Error decoding NetlinkEvent");
    } else {
    	// 开始处理从kernel层接受到的消息
        onEvent(evt);
    }

    delete evt;
    return true;
}


Step.35 NetlinkHandler的onEvent方法,将消息解析后,传给ValumeManager来处理
void NetlinkHandler::onEvent(NetlinkEvent *evt) {
    VolumeManager *vm = VolumeManager::Instance();
    const char *subsys = evt->getSubsystem();

    if (!subsys) {
        SLOGW("No subsystem found in netlink event");
        return;
    }

    if (!strcmp(subsys, "block")) {
        vm->handleBlockEvent(evt);
    }
}


Step.36~41
消息经过各种Check后,将消息发到vold Socket中.
----------------------------
void VolumeManager::handleBlockEvent(NetlinkEvent *evt) {
    const char *devpath = evt->findParam("DEVPATH");

    /* Lookup a volume to handle this device */
    VolumeCollection::iterator it;
    bool hit = false;
    for (it = mVolumes->begin(); it != mVolumes->end(); ++it) {
        if (!(*it)->handleBlockEvent(evt)) {
#ifdef NETLINK_DEBUG
            SLOGD("Device '%s' event handled by volume %s\n", devpath, (*it)->getLabel());
#endif
            hit = true;
            break;
        }
    }

    if (!hit) {
#ifdef NETLINK_DEBUG
        SLOGW("No volumes handled block event for '%s'", devpath);
#endif
    }
}
----------------------------

----------------------------
int DirectVolume::handleBlockEvent(NetlinkEvent *evt) {
    const char *dp = evt->findParam("DEVPATH");

    PathCollection::iterator  it;
    for (it = mPaths->begin(); it != mPaths->end(); ++it) {
        if (!strncmp(dp, *it, strlen(*it))) {
            /* We can handle this disk */
            int action = evt->getAction();
            const char *devtype = evt->findParam("DEVTYPE");

            if (action == NetlinkEvent::NlActionAdd) {
                int major = atoi(evt->findParam("MAJOR"));
                int minor = atoi(evt->findParam("MINOR"));
                char nodepath[255];

                snprintf(nodepath,
                         sizeof(nodepath), "/dev/block/vold/%d:%d",
                         major, minor);
                if (createDeviceNode(nodepath, major, minor)) {
                    SLOGE("Error making device node '%s' (%s)", nodepath,
                                                               strerror(errno));
                }
                if (!strcmp(devtype, "disk")) {
                    handleDiskAdded(dp, evt);
                } else {
                    handlePartitionAdded(dp, evt);
                }
            } else if (action == NetlinkEvent::NlActionRemove) {
                if (!strcmp(devtype, "disk")) {
                    handleDiskRemoved(dp, evt);
                } else {
                    handlePartitionRemoved(dp, evt);
                }
            } else if (action == NetlinkEvent::NlActionChange) {
                if (!strcmp(devtype, "disk")) {
                    handleDiskChanged(dp, evt);
                } else {
                    handlePartitionChanged(dp, evt);
                }
            } else {
                    SLOGW("Ignoring non add/remove/change event");
            }

            return 0;
        }
    }
    errno = ENODEV;
    return -1;
}
----------------------------

----------------------------
void DirectVolume::handleDiskAdded(const char *devpath, NetlinkEvent *evt) {
    mDiskMajor = atoi(evt->findParam("MAJOR"));
    mDiskMinor = atoi(evt->findParam("MINOR"));

    const char *tmp = evt->findParam("NPARTS");
    if (tmp) {
        mDiskNumParts = atoi(tmp);
    } else {
        SLOGW("Kernel block uevent missing 'NPARTS'");
        mDiskNumParts = 1;
    }

// FUJITSU TEN:2014-01-14 #55703 start
    snprintf(mDevPath, 1024, "/sys/%s", evt->findParam("DEVPATH"));
// FUJITSU TEN:2014-01-14 #55703 end

    char msg[255];

    int partmask = 0;
    int i;
    for (i = 1; i <= mDiskNumParts; i++) {
        partmask |= (1 << i);
    }
    mPendingPartMap = partmask;

    if (mDiskNumParts == 0) {
#ifdef PARTITION_DEBUG
        SLOGD("Dv::diskIns - No partitions - good to go son!");
#endif
        setState(Volume::State_Idle);
    } else {
#ifdef PARTITION_DEBUG
        SLOGD("Dv::diskIns - waiting for %d partitions (mask 0x%x)",
             mDiskNumParts, mPendingPartMap);
#endif
        setState(Volume::State_Pending);
    }

    snprintf(msg, sizeof(msg), "Volume %s %s disk inserted (%d:%d)",
             getLabel(), getMountpoint(), mDiskMajor, mDiskMinor);
    mVm->getBroadcaster()->sendBroadcast(ResponseCode::VolumeDiskInserted,
                                             msg, false);
}
----------------------------

----------------------------
void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) {
    pthread_mutex_lock(&mClientsLock);
    SocketClientCollection::iterator i;

    for (i = mClients->begin(); i != mClients->end(); ++i) {
        if ((*i)->sendMsg(code, msg, addErrno)) {
            SLOGW("Error sending broadcast (%s)", strerror(errno));
        }
    }
    pthread_mutex_unlock(&mClientsLock);
}
----------------------------

----------------------------
int SocketClient::sendMsg(int code, const char *msg, bool addErrno) {
    char *buf;
    const char* arg;
    const char* fmt;
    char tmp[1];
    int  len;

    if (addErrno) {
        fmt = "%.3d %s (%s)";
        arg = strerror(errno);
    } else {
        fmt = "%.3d %s";
        arg = NULL;
    }
    /* Measure length of required buffer */
    len = snprintf(tmp, sizeof tmp, fmt, code, msg, arg);
    /* Allocate in the stack, then write to it */
    buf = (char*)alloca(len+1);
    snprintf(buf, len+1, fmt, code, msg, arg);
    /* Send the zero-terminated message */
    return sendMsg(buf);
}

int SocketClient::sendMsg(const char *msg) {
    if (mSocket < 0) {
        errno = EHOSTUNREACH;
        return -1;
    }

    // Send the message including null character
    if (sendData(msg, strlen(msg) + 1) != 0) {
        SLOGW("Unable to send msg '%s'", msg);
        return -1;
    }
    return 0;
}

int SocketClient::sendData(const void* data, int len) {
    int rc = 0;
    const char *p = (const char*) data;
    int brtw = len;

    if (len == 0) {
        return 0;
    }

    pthread_mutex_lock(&mWriteMutex);
    while (brtw > 0) {
        rc = write(mSocket, p, brtw);
        if (rc > 0) {
            p += rc;
            brtw -= rc;
            continue;
        }

        if (rc < 0 && errno == EINTR)
            continue;

        pthread_mutex_unlock(&mWriteMutex);
        if (rc == 0) {
            SLOGW("0 length write :(");
            errno = EIO;
        } else {
            SLOGW("write error (%s)", strerror(errno));
        }
        return -1;
    }
    pthread_mutex_unlock(&mWriteMutex);
    return 0;
}
----------------------------


Library层接受到Kernel层消息,到发送到Application Framework层的处理就到这里了。
后面是Application Framework层如何获取到这个消息

MountService实例化时,创建了一个用来监听vold Socket的Connector
public MountService(Context context) {
    ......
    /*
     * Create the connection to vold with a maximum queue of twice the
     * amount of containers we'd ever expect to have. This keeps an
     * "asec list" from blocking a thread repeatedly.
     */
    // 创建用来监听vold Socket的Connector
    mConnector = new NativeDaemonConnector(this, "vold", MAX_CONTAINERS * 2, VOLD_TAG);
    mReady = false;
    // 启动mConnector,NativeDaemonConnector的run方法中调用了listenToSocket方法,
    // 开始监听这个vold Socket
    Thread thread = new Thread(mConnector, VOLD_TAG);
    thread.start();
	......
}


NativeDaemonConnector的run方法中调用了listenToSocket方法,开始监听vold Socket
private void listenToSocket() throws IOException {
    LocalSocket socket = null;

    try {
        ......

        while (true) {
            int count = inputStream.read(buffer, start, BUFFER_SIZE - start);
            if (count < 0) break;

            // Add our starting point to the count and reset the start.
            count += start;
            start = 0;

            for (int i = 0; i < count; i++) {
                if (buffer[i] == 0) {
                    String event = new String(buffer, start, i - start);
                    if (LOCAL_LOGD) Slog.d(TAG, String.format("RCV <- {%s}", event));

                    String[] tokens = event.split(" ", 2);
                    try {
                        int code = Integer.parseInt(tokens[0]);

                        if (code >= ResponseCode.UnsolicitedInformational) {
                        	// 发送消息到Handler中
                            mCallbackHandler.sendMessage(
                                    mCallbackHandler.obtainMessage(code, event));
                        } else {
                            try {
                                mResponseQueue.put(event);
                            } catch (InterruptedException ex) {
                                Slog.e(TAG, "Failed to put response onto queue", ex);
                            }
                        }
                    } catch (NumberFormatException nfe) {
                        Slog.w(TAG, String.format("Bad msg (%s)", event));
                    }
                    start = i + 1;
                }
            }
            ......
        }
    } 
    ......
}


public boolean handleMessage(Message msg) {
    String event = (String) msg.obj;
    try {
        if (!mCallbacks.onEvent(msg.what, event, event.split(" "))) {
            Slog.w(TAG, String.format(
                    "Unhandled event '%s'", event));
        }
    } catch (Exception e) {
        Slog.e(TAG, String.format(
                "Error handling '%s'", event), e);
    }
    return true;
}


mCallbacks就是NativeDaemonConnector实例化时传递进来的,
它其实就是MountService。所以回到MountService的onEvent方法
/**
 * Callback from NativeDaemonConnector
 */
public boolean onEvent(int code, String raw, String[] cooked) {
    ......
    if (code == VoldResponseCode.VolumeStateChange) {
       ......
    } else if ((code == VoldResponseCode.VolumeDiskInserted) ||
               (code == VoldResponseCode.VolumeDiskRemoved) ||
               (code == VoldResponseCode.VolumeBadRemoval)) {
        ......
    } else {
        return false;
    }

    return true;
}


OK!!! 这里就开始分发处理各种消息了。

明天开始FrameWork层向下发送消息处理流程
  • 大小: 480 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics