热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ApacheCurator之InterProcessMutex源码分析(四)

上篇文章通过秒购的例子对InterProcessMutex锁有了初步认识,本文将通过对源码进行分析带你进入分布式锁的世界。老规矩先上图,为了更清晰的了解获取锁,释放锁的过程,下图简

上篇文章通过秒购的例子对InterProcessMutex锁有了初步认识,本文将通过对源码进行分析带你进入分布式锁的世界。

老规矩先上图,为了更清晰的了解获取锁,释放锁的过程,下图简化了一些细节,使整个流程更加通畅。

Apache Curator之InterProcessMutex源码分析(四)

下面将逐个方法去分析。

InterProcessMutex.acquire()

 

 1     @Override
//获得分布式锁,阻塞
2 public void acquire() throws Exception { 3 if (!internalLock(-1, null)) { 4 throw new IOException("Lost connection while trying to acquire lock: " + basePath); 5 } 6 } 7
//获得分布式锁,在指定的时间内阻塞,推荐使用 8 @Override 9 public boolean acquire(long time, TimeUnit unit) throws Exception { 10 return internalLock(time, unit); 11 }

 

InterProcessMutex.internalLock(long time, TimeUnit unit)

 1     private boolean internalLock(long time, TimeUnit unit) throws Exception {
 2 
 3         Thread currentThread = Thread.currentThread();
 4         //获得当前线程的锁对象
 5         LockData lockData = threadData.get(currentThread);
 6         if (lockData != null) {
 7             // re-entering
//如果锁不为空,当前线程已经获得锁,可重入锁,lockCount++ 8 lockData.lockCount.incrementAndGet(); 9 return true; 10 } 11
//获取锁,返回锁的节点路径 12 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); 13 if (lockPath != null) {
//将当前线程的锁对象信息保存起来
14 LockData newLockData = new LockData(currentThread, lockPath); 15 threadData.put(currentThread, newLockData); 16 return true; 17 } 18 19 return false; 20 }

LockInternals.attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes)

 1     String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
 2         final long startMillis = System.currentTimeMillis();
 3         final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;//等待时间,毫秒
 4         final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
 5         int retryCount = 0;
 6 
 7         String ourPath = null;
 8         boolean hasTheLock = false;
 9         boolean isDOne= false;
10         while (!isDone) {
11             isDOne= true;
12 
13             try {
//在当前path下创建临时有序节点
14 ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//判断是不是序号最小的节点,如果是返回true,否则阻塞等待
15 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); 16 } catch (KeeperException.NoNodeException e) { 17 if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { 18 isDOne= false; 19 } else { 20 throw e; 21 } 22 } 23 } 24 25 if (hasTheLock) { 26 return ourPath; 27 } 28 29 return null; 30 }

StandardLockInternalsDriver.createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes)

 1     @Override
 2     public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
 3         String ourPath;
//在zookeeper的指定路径上,创建一个临时序列节点。只是纯粹的创建了一个节点,并不是说线程已经持有了锁。  
4 if (lockNodeBytes != null) { 5 ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); 6 } else { 7 ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); 8 } 9 return ourPath; 10 }

LockInternals.internalLockLoop(long startMillis, Long millisToWait, String ourPath)

 1     private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
 2         boolean haveTheLock = false;
 3         boolean doDelete = false;
 4         try {
 5             if (revocable.get() != null) {
 6                 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
 7             }
 8 
 9             while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            //获得所有子节点
10 List children = getSortedChildren(); 11 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash 12 13 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
//判断是否是最小节点
14 if (predicateResults.getsTheLock()) { 15 haveTheLock = true; 16 } else { 17 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); 18             //同步,是为了实现公平锁 19 synchronized (this) { 20 try {
//给比自己小的节点设置监听器
21 client.getData().usingWatcher(watcher).forPath(previousSequencePath); 22 if (millisToWait != null) { 23 millisToWait -= (System.currentTimeMillis() - startMillis); 24 startMillis = System.currentTimeMillis();
//等待超时,超时删除临时节点,超时时间是acquire方法传入的参数
25 if (millisToWait <= 0) { 26 doDelete = true; // timed out - delete our node 27 break; 28 } 29 //没有超时,继续等待 30 wait(millisToWait); 31 } else {
//如果等待时间==null,一直阻塞等待
32 wait(); 33 } 34 } catch (KeeperException.NoNodeException e) { 35 // it has been deleted (i.e. lock released). Try to acquire again 36 } 37 } 38 } 39 } 40 } catch (Exception e) { 41 ThreadUtils.checkInterrupted(e); 42 doDelete = true; 43 throw e; 44 } finally { 45 if (doDelete) { 46 deleteOurPath(ourPath);//如果如果抛出异常或超时,都会删除临时节点 47 } 48 } 49 return haveTheLock; 50 }

LockInternals.getSortedChildren()

 1     List getSortedChildren() throws Exception {
 2         return getSortedChildren(client, basePath, lockName, driver);
 3     }
 4 
 5     public static List getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
 6         try {
//获得basePath下所有子节点,进行排序
7 List children = client.getChildren().forPath(basePath); 8 List sortedList = Lists.newArrayList(children); 9 Collections.sort(sortedList, new Comparator() { 10 @Override 11 public int compare(String lhs, String rhs) { 12 return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName)); 13 } 14 }); 15 return sortedList; 16 } catch (KeeperException.NoNodeException ignore) { 17 return Collections.emptyList(); 18 } 19 }

StandardLockInternalsDriver.getsTheLock(CuratorFramework c, List child, String nodeName, int max)

 1     @Override
 2     public PredicateResults getsTheLock(CuratorFramework client, List children, String sequenceNodeName, int maxLeases) throws Exception {
 3         int ourIndex = children.indexOf(sequenceNodeName);
 4         validateOurIndex(sequenceNodeName, ourIndex);
 5 
 6         boolean getsTheLock = ourIndex < maxLeases;
 7         String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
 8 
 9         return new PredicateResults(pathToWatch, getsTheLock);
10     }

LockInternals.deleteOurPath(String ourPath)

1     private void deleteOurPath(String ourPath) throws Exception {
2         try {
3             client.delete().guaranteed().forPath(ourPath);
4         } catch (KeeperException.NoNodeException e) {
5             // ignore - already deleted (possibly expired session, etc.)
6         }
7     }

InterProcessMutex.release()

 1     @Override
 2     public void release() throws Exception {
 3         /*
 4             Note on concurrency: a given lockData instance
 5             can be only acted on by a single thread so locking isn't necessary
 6          */
 7 
 8         Thread currentThread = Thread.currentThread();
 9         LockData lockData = threadData.get(currentThread);
//如果当前线程没有持有锁,不能释放
10 if (lockData == null) { 11 throw new IllegalMonitorStateException("You do not own the lock: " + basePath); 12 } 13
//重入锁计数减一,如果还大于0,不能释放。直到所有重入业务完成,计数为0才能释放 14 int newLockCount = lockData.lockCount.decrementAndGet(); 15 if (newLockCount > 0) { 16 return; 17 } 18 if (newLockCount <0) { 19 throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); 20 } 21 try { 22 internals.releaseLock(lockData.lockPath); 23 } finally { 24 threadData.remove(currentThread); 25 } 26 }

LockInternals.release()

1     final void releaseLock(String lockPath) throws Exception {
2         client.removeWatchers();//删除监听
3         revocable.set(null);
4         deleteOurPath(lockPath);//删除Path
5     }

 

 
 

推荐阅读
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • PHP图片截取方法及应用实例
    本文介绍了使用PHP动态切割JPEG图片的方法,并提供了应用实例,包括截取视频图、提取文章内容中的图片地址、裁切图片等问题。详细介绍了相关的PHP函数和参数的使用,以及图片切割的具体步骤。同时,还提供了一些注意事项和优化建议。通过本文的学习,读者可以掌握PHP图片截取的技巧,实现自己的需求。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • PHP中的单例模式与静态变量的区别及使用方法
    本文介绍了PHP中的单例模式与静态变量的区别及使用方法。在PHP中,静态变量的存活周期仅仅是每次PHP的会话周期,与Java、C++不同。静态变量在PHP中的作用域仅限于当前文件内,在函数或类中可以传递变量。本文还通过示例代码解释了静态变量在函数和类中的使用方法,并说明了静态变量的生命周期与结构体的生命周期相关联。同时,本文还介绍了静态变量在类中的使用方法,并通过示例代码展示了如何在类中使用静态变量。 ... [详细]
  • 标题: ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • 本文介绍了深入浅出Linux设备驱动编程的重要性,以及两种加载和删除Linux内核模块的方法。通过一个内核模块的例子,展示了模块的编译和加载过程,并讨论了模块对内核大小的控制。深入理解Linux设备驱动编程对于开发者来说非常重要。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
author-avatar
丁军东建宏
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有