热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Android7.0MessageQueue详解

这篇文章主要为大家详细介绍了Android7.0MessageQueue的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

Android中的消息处理机制大量依赖于Handler。每个Handler都有对应的Looper,用于不断地从对应的MessageQueue中取出消息处理。

一直以来,觉得MessageQueue应该是Java层的抽象,然而事实上MessageQueue的主要部分在Native层中。
自己对MessageQueue在Native层的工作不太熟悉,借此机会分析一下。

一、MessageQueue的创建

当需要使用Looper时,我们会调用Looper的prepare函数:

public static void prepare() {
 prepare(true);
}

private static void prepare(boolean quitAllowed) {
 if (sThreadLocal.get() != null) {
 throw new RuntimeException("Only one Looper may be created per thread");
 }
 //sThreadLocal为线程本地存储区;每个线程仅有一个Looper
 sThreadLocal.set(new Looper(quitAllowed));
}

private Looper(boolean quitAllowed) {
 //创建出MessageQueue
 mQueue = new MessageQueue(quitAllowed);
 mThread = Thread.currentThread();
}

1 NativeMessageQueue

我们看看MessageQueue的构造函数:

MessageQueue(boolean quitAllowed) {
 mQuitAllowed = quitAllowed;
 //mPtr的类型为long?
 mPtr = nativeInit();
}

MessageQueue的构造函数中就调用了native函数,我们看看android_os_MessageQueue.cpp中的实现:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
 //MessageQueue的Native层实体
 NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
 ............
 //这里应该类似与将指针转化成long类型,放在Java层保存;估计Java层使用时,会在native层将long变成指针,就可以操作队列了
 return reinterpret_cast(nativeMessageQueue);
}

我们跟进NativeMessageQueue的构造函数:

NativeMessageQueue::NativeMessageQueue() :
 mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
 //创建一个Native层的Looper,也是线程唯一的
 mLooper = Looper::getForThread();
 if (mLooper == NULL) {
 mLooper = new Looper(false);
 Looper::setForThread(mLooper);
 }
}

从代码来看,Native层和Java层均有Looper对象,应该都是操作MessageQueue的。MessageQueue在Java层和Native层有各自的存储结构,分别存储Java层和Native层的消息。

2 Native层的looper

我们看看Native层looper的构造函数:

Looper::Looper(bool allowNonCallbacks) :
 mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
 mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
 mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
 //此处创建了个fd
 mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
 .......
 rebuildEpollLocked();
}

在native层中,MessageQueue中的Looper初始化时,还调用了rebuildEpollLocked函数,我们跟进一下:

void Looper::rebuildEpollLocked() {
 // Close old epoll instance if we have one.
 if (mEpollFd >= 0) {
 close(mEpollFd);
 }

 // Allocate the new epoll instance and register the wake pipe.
 mEpollFd = epoll_create(EPOLL_SIZE_HINT);
 ............
 struct epoll_event eventItem;
 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
 eventItem.events = EPOLLIN;
 eventItem.data.fd = mWakeEventFd;
 //在mEpollFd上监听mWakeEventFd上是否有数据到来
 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
 ...........
 for (size_t i = 0; i 

从native层的looper来看,我们知道Native层依赖于epoll来驱动事件处理。此处我们先保留一下大致的映像,后文详细分析。

二、使用MessageQueue

1 写入消息
 Android中既可以在Java层向MessageQueue写入消息,也可以在Native层向MessageQueue写入消息。我们分别看一下对应的操作流程。

1.1 Java层写入消息
Java层向MessageQueue写入消息,依赖于enqueueMessage函数:

boolean enqueueMessage(Message msg, long when) {
 if (msg.target == null) {
 throw new IllegalArgumentException("Message must have a target.");
 }
 if (msg.isInUse()) {
 throw new IllegalStateException(msg + " This message is already in use.");
 }

 synchronized (this) {
 if (mQuitting) {
  .....
  return false;
 }

 msg.markInUse();
 msg.when = when;
 Message p = mMessages;
 boolean needWake;
 if (p == null || when == 0 || when 

上述代码比较简单,主要就是将新加入的Message按执行时间插入到原有的队列中,然后根据情况调用nativeAwake函数。

我们跟进一下nativeAwake:

void NativeMessageQueue::wake() {
 mLooper->wake();
}

void Looper::wake() {
 uint64_t inc = 1;
 //就是向mWakeEventFd写入数据
 ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
 .............
}

在native层的looper初始化时,我们提到过native层的looper将利用epoll来驱动事件,其中构造出的epoll句柄就监听了mWakeEventFd。
实际上从MessageQueue中取出数据时,若没有数据到来,就会利用epoll进行等待;因此当Java层写入消息时,将会将唤醒处于等待状态的MessageQueue。
在后文介绍从MessageQueue中提取消息时,将再次分析这个问题。

1.2 Native层写入消息
Native层写入消息,依赖于Native层looper的sendMessage函数:

void Looper::sendMessage(const sp& handler, const Message& message) {
 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
 sendMessageAtTime(now, handler, message);
}

void Looper::sendMessageAtTime(nsecs_t uptime, const sp& handler,
 const Message& message) {
 size_t i = 0;
 {
 AutoMutex _l(mLock);

 //同样需要按时间插入
 size_t messageCount = mMessageEnvelopes.size();
 while (i = mMessageEnvelopes.itemAt(i).uptime) {
  i += 1;
 }

 //将message包装成一个MessageEnvelope对象
 MessageEnvelope messageEnvelope(uptime, handler, message);
 mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

 // Optimization: If the Looper is currently sending a message, then we can skip
 // the call to wake() because the next thing the Looper will do after processing
 // messages is to decide when the next wakeup time should be. In fact, it does
 // not even matter whether this code is running on the Looper thread.
 if (mSendingMessage) {
  return;
 }
 }
 // Wake the poll loop only when we enqueue a new message at the head.
 if (i == 0) {
 //若插入在队列头部,同样利用wake函数触发epoll唤醒
 wake();
 }
}

以上就是向MessageQueue中加入消息的主要流程,接下来我们看看从MessageQueue中取出消息的流程。

2、提取消息
当Java层的Looper对象调用loop函数时,就开始使用MessageQueue提取消息了:

public static void loop() {
 final Looper me = myLooper();
 .......
 for (;;) {
 Message msg = queue.next(); // might block
 .......
 try {
  //调用Message的处理函数进行处理
  msg.target.dispatchMessage(msg);
 }........
 }
}

此处我们看看MessageQueue的next函数:

Message next() {
 //mPtr保存了NativeMessageQueue的指针
 final long ptr = mPtr;
 .......
 int pendingIdleHandlerCount = -1; // -1 only during first iteration
 int nextPollTimeoutMillis = 0;

 for (;;) {
 if (nextPollTimeoutMillis != 0) {
  //会调用Native函数,最终调用IPCThread的talkWithDriver,将数据写入Binder驱动或者读取一次数据
  //不知道在此处进行这个操作的理由?
  Binder.flushPendingCommands();
 }

 //处理native层的数据,此处会利用epoll进行blocked
 nativePollOnce(ptr, nextPollTimeoutMillis);

 synchronized (this) {
  final long now = SystemClock.uptimeMillis();
  Message prevMsg = null;
  Message msg = mMessages;
  //下面其实就是找出下一个异步处理类型的消息;异步处理类型的消息,才含有对应的执行函数
  if (msg != null && msg.target == null) {
  // Stalled by a barrier. Find the next asynchronous message in the queue.
  do {
   prevMsg = msg;
   msg = msg.next;
  } while (msg != null && !msg.isAsynchronous());
  }

  if (msg != null) {
  if (now 

整个提取消息的过程,大致上如上图所示。
可以看到在Java层,Looper除了要取出MessageQueue的消息外,还会在队列空闲期执行IdleHandler定义的函数。

2.1 nativePollOnce
现在唯一的疑点是nativePollOnce是如何处理Native层数据的,我们看看对应的native函数:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
 jlong ptr, jint timeoutMillis) {
 //果然Java层调用native层MessageQueue时,将long类型的ptr变为指针
 NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
 nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
 mPollEnv = env;
 mPollObj = pollObj;
 //最后还是进入到Native层looper的pollOnce函数
 mLooper->pollOnce(timeoutMillis);
 mPollObj = NULL;
 mPollEnv = NULL;

 if (mExceptionObj) {
 .........
 }
}

看看native层looper的pollOnce函数:

//timeoutMillis为超时等待时间。值为-1时,表示无限等待直到有事件到来;值为0时,表示无需等待
//outFd此时为null,含义是:存储产生事件的文件句柄
//outEvents此时为null,含义是:存储outFd上发生了哪些事件,包括可读、可写、错误和中断
//outData此时为null,含义是:存储上下文数据,其实调用时传入的参数
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
 int result = 0;
 for (;;) {
 //处理response,目前我们先不关注response的内含
 while (mResponseIndex = 0) {
  int fd = response.request.fd;
  int events = response.events;
  void* data = response.request.data;

  if (outFd != NULL) *outFd = fd;
  if (outEvents != NULL) *outEvents = events;
  if (outData != NULL) *outData = data;
  return ident;
  }
 }

 //根据pollInner的结果,进行操作
 if (result != 0) {
  if (outFd != NULL) *outFd = 0;
  if (outEvents != NULL) *outEvents = 0;
  if (outData != NULL) *outData = NULL;
  return result;
 }

 //主力还是靠pollInner
 result = pollInner(timeoutMillis);
 }
}

跟进一下pollInner函数:

int Looper::pollInner(int timeoutMillis) {
 // Adjust the timeout based on when the next message is due.
 //timeoutMillis是Java层事件等待事件
 //native层维持了native message的等待时间
 //此处其实就是选择最小的等待时间
 if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
  nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
  int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
  if (messageTimeoutMillis >= 0
  && (timeoutMillis <0 || messageTimeoutMillis = 0) {
  int events = 0;
  if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
  if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
  if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
  if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
  //存储这个fd对应的response
  pushResponse(events, mRequests.valueAt(requestIndex));
  } else {
  ..........
  }
 }
 }

Done:

 // Invoke pending message callbacks.
 mNextMessageUptime = LLONG_MAX;
 //处理Native层的Message
 while (mMessageEnvelopes.size() != 0) {
 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
 if (messageEnvelope.uptime <= now) {
  // Remove the envelope from the list.
  // We keep a strong reference to the handler until the call to handleMessage
  // finishes. Then we drop it so that the handler can be deleted *before*
  // we reacquire our lock.
  {
  sp handler = messageEnvelope.handler;
  Message message = messageEnvelope.message;
  mMessageEnvelopes.removeAt(0);
  mSendingMessage = true;
  mLock.unlock();

  //处理Native Message
  handler->handleMessage(message);
  }
  mLock.lock();
  mSendingMessage = false;
  result = POLL_CALLBACK;
 } else {
  // The last message left at the head of the queue determines the next wakeup time.
  mNextMessageUptime = messageEnvelope.uptime;
  break;
 }
 }

 // Release lock.
 mLock.unlock();

 //处理带回调函数的response
 for (size_t i = 0; i handleEvent(fd, events, data);
  if (callbackResult == 0) {
  removeFd(fd, response.request.seq);
  }

  response.request.callback.clear();
  result = POLL_CALLBACK;
 }
 }
 return result;
}

说实话native层的代码写的很乱,该函数的功能比较多。
如上图所示,在nativePollOnce中利用epoll监听是否有数据到来,然后处理native message、native response。

最后,我们看看如何在native层中加入request。

3 添加监控请求
native层增加request依赖于looper的接口addFd:

//fd表示需要监听的句柄
//ident的含义还没有搞明白
//events表示需要监听的事件,例如EVENT_INPUT、EVENT_OUTPUT、EVENT_ERROR和EVENT_HANGUP中的一个或多个
//callback为事件发生后的回调函数
//data为回调函数对应的参数
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
 return addFd(fd, ident, events, callback &#63; new SimpleLooperCallback(callback) : NULL, data);
}

结合上文native层轮询队列的操作,我们大致可以知道:addFd的目的,就是让native层的looper监控新加入的fd上是否有指定事件发生。
如果发生了指定的事件,就利用回调函数及参数构造对应的response。
native层的looper处理response时,就可以执行对应的回调函数了。

看看实际的代码:

int Looper::addFd(int fd, int ident, int events, const sp& callback, void* data) {
 ........
 {
 AutoMutex _l(mLock);

 //利用参数构造一个request
 Request request;
 request.fd = fd;
 request.ident = ident;
 request.events = events;
 request.seq = mNextRequestSeq++;
 request.callback = callback;
 request.data = data;
 if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

 struct epoll_event eventItem;
 request.initEventItem(&eventItem);

 //判断之前是否已经利用该fd构造过Request
 ssize_t requestIndex = mRequests.indexOfKey(fd);
 if (requestIndex <0) {
  //mEpollFd新增一个需监听fd
  int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
  .......
  mRequests.add(fd, request);
 } else {
  //mEpollFd修改旧的fd对应的监听事件
  int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
  if (epollResult <0) {
  if (errno == ENOENT) {
   // Tolerate ENOENT because it means that an older file descriptor was
   // closed before its callback was unregistered and meanwhile a new
   // file descriptor with the same number has been created and is now
   // being registered for the first time. 
   epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
   .......
  }
  //发生错误重新加入时,安排EpollRebuildLocked,将让epollFd重新添加一次待监听的fd
  scheduleEpollRebuildLocked();
  }
  mRequests.replaceValueAt(requestIndex, request);
 }
 }
}

对加入监控请求的处理,在上文介绍pollInner函数时已做分析,此处不再赘述。

三、总结

1、流程总结


MessageQueue的整个流程包括了Java部分和Native部分,从图中可以看出Native层的比重还是很大的。我们结合上图回忆一下整个MessageQueue对应的处理流程:
1、Java层创建Looper对象时,将会创建Java层的MessageQueue;Java层的MessageQueue初始化时,将利用Native函数创建出Native层的MessageQueue。

2、Native层的MessageQueue初始化后,将创建对应的Native Looper对象。Native对象初始化时,将创建对应epollFd和WakeEventFd。其中,epollFd将作为epoll的监听句柄,初始时epollFd仅监听WakeEventFd。

3、图中红色线条为Looper从MessageQueue中取消息时,处理逻辑的流向。
3.1、当Java层的Looper开始循环时,首先需要通过JNI函数调用Native Looper进行pollOnce的操作。

3.2、Native Looper开始运行后,需要等待epollFd被唤醒。当epollFd等待超时或监听的句柄有事件到来,Native Looper就可以开始处理事件了。

3.3、在Native层,Native Looper将先处理Native MessageQueue中的消息,再调用Response对应的回调函数。

3.4、本次循环中,Native层事件处理完毕后,才开始处理Java层中MessageQueue的消息。若MessageQueue中没有消息需要处理,并且MessageQueue中存在IdleHandler时,将调用IdleHandler定义的处理函数。

图中蓝色部分为对应的函数调用:
在Java层:
利用MessageQueue的addIdleHandler,可以为MessageQueue增加IdleHandler;
利用MessageQueue的enqueueMessage,可以向MessageQueue增加消息;必要时将利用Native函数向Native层的WakeEventFd写入消息,以唤醒epollFd。

在Native层:
利用looper:sendMessage,可以为Native MessageQueue增加消息;同样,要时将向Native层的WakeEventFd写入消息,以唤醒epollFd;
利用looper:addFd,可以向Native Looper注册监听请求,监听请求包含需监听的fd、监听的事件及对应的回调函数等,监听请求对应的fd将被成为epollFd监听的对象。当被监听的fd发生对应的事件后,将会唤醒epollFd,此时将生成对应response加入的response List中,等待处理。一旦response被处理,就会调用对应的回调函数。

2、注意事项
MessageQueue在Java层和Native层有各自的存储结构,可以分别增加消息。从处理逻辑来看,会优先处理native层的Message,然后处理Native层生成的response,最后才是处理Java层的Message。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • IhaveconfiguredanactionforaremotenotificationwhenitarrivestomyiOsapp.Iwanttwodiff ... [详细]
  • Python字典推导式及循环列表生成字典方法
    本文介绍了Python中使用字典推导式和循环列表生成字典的方法,包括通过循环列表生成相应的字典,并给出了执行结果。详细讲解了代码实现过程。 ... [详细]
  • 本文讨论了在Windows 8上安装gvim中插件时出现的错误加载问题。作者将EasyMotion插件放在了正确的位置,但加载时却出现了错误。作者提供了下载链接和之前放置插件的位置,并列出了出现的错误信息。 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • “你永远都不知道明天和‘公司的意外’哪个先来。”疫情期间,这是我们最战战兢兢的心情。但是显然,有些人体会不了。这份行业数据,让笔者“柠檬” ... [详细]
  • 本文主要解析了Open judge C16H问题中涉及到的Magical Balls的快速幂和逆元算法,并给出了问题的解析和解决方法。详细介绍了问题的背景和规则,并给出了相应的算法解析和实现步骤。通过本文的解析,读者可以更好地理解和解决Open judge C16H问题中的Magical Balls部分。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • 生成对抗式网络GAN及其衍生CGAN、DCGAN、WGAN、LSGAN、BEGAN介绍
    一、GAN原理介绍学习GAN的第一篇论文当然由是IanGoodfellow于2014年发表的GenerativeAdversarialNetworks(论文下载链接arxiv:[h ... [详细]
  • [译]技术公司十年经验的职场生涯回顾
    本文是一位在技术公司工作十年的职场人士对自己职业生涯的总结回顾。她的职业规划与众不同,令人深思又有趣。其中涉及到的内容有机器学习、创新创业以及引用了女性主义者在TED演讲中的部分讲义。文章表达了对职业生涯的愿望和希望,认为人类有能力不断改善自己。 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
author-avatar
手机用户2602926907
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有