Picasso源代码分析:4、任务的分发 Dispatcher

Picasso的源代码分析已经到了第4届,之前我们依次分析了Picasso,ReqeustCreaor,BitmapHunter,Picasso是Picasso框架的入口,同时也负责初始化各种工作的子线程,工作的实例;RequestCreator则是对任务的描述;而BitmapHunter则是一个具体的工作的线程,接受Picasso中配置好的调度池的调度同时负责知道对应的ReqeustHandler去执行数据请求,并将结果解析为图片,还可以进行一些图片的处理(Transformation)。但之前我们的分析中一直没有考虑到的一点是,在Picasso当中,任务到底是怎么调度的?任务到底怎么添加进来?添加进来之后都做了哪些工作?任务失败之后重试策略如何(Picasso目前还没有看到类似Volley那种明显的RetryPolicy)?所以,接下来,我们要分析DispatcherGithub

Dispatcher

首先,在Dispatcher中定义了很多的事件,如下:

  static final int REQUEST_SUBMIT = 1;
  static final int REQUEST_CANCEL = 2;
  static final int REQUEST_GCED = 3;
  static final int HUNTER_COMPLETE = 4;
  static final int HUNTER_RETRY = 5;
  static final int HUNTER_DECODE_FAILED = 6;
  static final int HUNTER_DELAY_NEXT_BATCH = 7;
  static final int HUNTER_BATCH_COMPLETE = 8;
  static final int NETWORK_STATE_CHANGE = 9;
  static final int AIRPLANE_MODE_CHANGE = 10;
  static final int TAG_PAUSE = 11;
  static final int TAG_RESUME = 12;
  static final int REQUEST_BATCH_RESUME = 13;

当Picasso.java中调用submit等方法的时候,会从Handler中获取相应的Message并发送给hanlder进行处理。因此我们首先要提出疑问:

  • handler工作在哪个线程成?
  • handler收到Message之后,其处理过程是怎么样?

先不着急,按照代码的顺序接着往下分析:

构造函数:

Dispatcher(Context context, ExecutorService service, Handler mainThreadHandler,
      Downloader downloader, Cache cache, Stats stats) {
    //任务分发线程,继承自ThreadHandler
    this.dispatcherThread = new DispatcherThread();
    this.dispatcherThread.start();
    this.context = context;
    this.service = service;
    //注意这个地方使用了LinkedHashMap,肯定要其特殊的考虑,暂时还不清楚。
    this.hunterMap = new LinkedHashMap<String, BitmapHunter>();
    //此处使用了两个WeakHashMap,也就说,一旦外部放弃对对应的Ojbect的引用的话,该Entry就会在GC时被回收。
    this.failedActions = new WeakHashMap<Object, Action>();
    this.pausedActions = new WeakHashMap<Object, Action>();
    //暂停的任务集合
    this.pausedTags = new HashSet<Object>();
    //这里就是处理分发任务的Handler,查看源代码可以知道,DispatcherHandler工作在dispatcherThread的线程之上。
    this.handler = new DispatcherHandler(dispatcherThread.getLooper(), this);
    this.downloader = downloader;
    //保留主线程的Handler的引用
    this.mainThreadHandler = mainThreadHandler;
    this.cache = cache;
    this.stats = stats;
    //一个列表,用来保存BitmapHunter的batch,但还不清楚为什么要限制size为4?
    this.batch = new ArrayList<BitmapHunter>(4);
    //是否为飞行模式
    this.airplaneMode = Utils.isAirplaneModeOn(this.context);
    //是否拥有检测网络状态变化的权限
    this.scansNetworkChanges = hasPermission(context, Manifest.permission.ACCESS_NETWORK_STATE);
    //注册网络状态的广播接收
    this.receiver = new NetworkBroadcastReceiver(this);
    receiver.register();
  }

在代码中我添加了注释,同时这个时候,我们就可以解释上面提到的第一个问题,handler是工作在哪个线程之上的?当然是dispatcherThread。

DispatcherThread是怎么样的呢?

static class DispatcherThread extends HandlerThread {
  DispatcherThread() {
    super(Utils.THREAD_PREFIX + DISPATCHER_THREAD_NAME, THREAD_PRIORITY_BACKGROUND);
  }
}

哈哈,更简单,其实就是一个HandlerThread,只是设置了Thread Name和优先级为BACKGROUND而已。

 shutdown()方法,关闭Dispatcher

  void shutdown() {
    // Shutdown the thread pool only if it is the one created by Picasso.
    // 关闭Picasso创建的线程池
    if (service instanceof PicassoExecutorService) {
      service.shutdown();
    }
    // 关闭downlaoder
    downloader.shutdown();
    // 退出dispatcherThread
    dispatcherThread.quit();
    // Unregister network broadcast receiver on the main thread.
    // 在主线程上注销receiver,注意和上面的Dispatcher构造函数区分,在构造函数中,没有特别说明需要在主线程上调用receiver.register(),
    Picasso.HANDLER.post(new Runnable() {
      @Override public void run() {
        receiver.unregister();
      }
    });
  }

有一个地方我们可以注意到的是,在构造函数中,广播接收器没有说明需要在主线程上运行,但关闭的时候,取消注册是在主线程上运行的,我们知道,广播的onReceive方法是在主线程上运行,但是广播的注册和取消注册,也必须要在主线程上进行吗?这个问题以后需要再讨论一下。

接下来我们去看看再Dispatcher中的Handler到底是如何处理各种消息的。

//Dispacher的处理函数
private static class DispatcherHandler extends Handler {
  private final Dispatcher dispatcher;

  // 根据Dispatcher中调用的构造函数DispatcherHandler(dispatcherThread.getLooper(), this)可以看出,
  // 该Handler其实是工作在DispatcherThread之上的。
  public DispatcherHandler(Looper looper, Dispatcher dispatcher) {
    super(looper);
    this.dispatcher = dispatcher;
  }

  // 根据不同的Message,调用dispatcher中对应的不同的方法。
  @Override public void handleMessage(final Message msg) {
    switch (msg.what) {
      case REQUEST_SUBMIT: {
        //Action提交之后,会调用dispatcher.performSubmit(action)函数。
        Action action = (Action) msg.obj;
        dispatcher.performSubmit(action);
        break;
      }
      case REQUEST_CANCEL: {
        Action action = (Action) msg.obj;
        dispatcher.performCancel(action);
        break;
      }
      case TAG_PAUSE: {
        Object tag = msg.obj;
        dispatcher.performPauseTag(tag);
        break;
      }
      case TAG_RESUME: {
        Object tag = msg.obj;
        dispatcher.performResumeTag(tag);
        break;
      }
      case HUNTER_COMPLETE: {
        BitmapHunter hunter = (BitmapHunter) msg.obj;
        dispatcher.performComplete(hunter);
        break;
      }
      case HUNTER_RETRY: {
        BitmapHunter hunter = (BitmapHunter) msg.obj;
        dispatcher.performRetry(hunter);
        break;
      }
      case HUNTER_DECODE_FAILED: {
        BitmapHunter hunter = (BitmapHunter) msg.obj;
        dispatcher.performError(hunter, false);
        break;
      }
      case HUNTER_DELAY_NEXT_BATCH: {
        dispatcher.performBatchComplete();
        break;
      }
      case NETWORK_STATE_CHANGE: {
        NetworkInfo info = (NetworkInfo) msg.obj;
        dispatcher.performNetworkStateChange(info);
        break;
      }
      case AIRPLANE_MODE_CHANGE: {
        dispatcher.performAirplaneModeChange(msg.arg1 == AIRPLANE_MODE_ON);
        break;
      }
      default:
        Picasso.HANDLER.post(new Runnable() {
          @Override public void run() {
            throw new AssertionError("Unknown handler message received: " + msg.what);
          }
        });
    }
  }
}

其实很简单,构造函数中,将Handler与DispatcherThread绑定在一起,所有的消息都在DispatcherThread中进行处理。那么Dispatcher具体是怎么处理各种请求的呢?这可是Picasso怎么去处理一个任务的核心内容,我们来看一下依次的处理函数:

performSubmit(Action action, boolean dismissFailed)

//执行提交的action
void performSubmit(Action action, boolean dismissFailed) {
  //如果pausedTags包含了对应的action
  if (pausedTags.contains(action.getTag())) {
    //那么将action放入到pausedActions当中(我擦,还有暂停功能)
    pausedActions.put(action.getTarget(), action);
    if (action.getPicasso().loggingEnabled) {
      log(OWNER_DISPATCHER, VERB_PAUSED, action.request.logId(),
          "because tag '" + action.getTag() + "' is paused");
    }
    return;
  }

  // 看看该任务是否已经添加过。
  BitmapHunter hunter = hunterMap.get(action.getKey());
  if (hunter != null) {
    hunter.attach(action);
    return;
  }

  //服务终止。。。
  if (service.isShutdown()) {
    if (action.getPicasso().loggingEnabled) {
      log(OWNER_DISPATCHER, VERB_IGNORED, action.request.logId(), "because shut down");
    }
    return;
  }

  //根据任务的uri,找到真正可以执行该任务的对应的Handler,重新封装为一个BitmapHunter
  hunter = forRequest(action.getPicasso(), this, cache, stats, action);
  //提交任务,并且获取一个future
  hunter.future = service.submit(hunter);
  //将提交过的任务放入hunterMap当中
  hunterMap.put(action.getKey(), hunter);
  //忽略失败标志。
  if (dismissFailed) {
    failedActions.remove(action.getTarget());
  }

  if (action.getPicasso().loggingEnabled) {
    log(OWNER_DISPATCHER, VERB_ENQUEUED, action.request.logId());
  }
}

我画了一个流程图

 

 

Disaptcher-performSubmit

 

简单的理解,就是根据Action创建BitmapHunter,前面我们已经看过,BitmapHunter为一个线程,因此将该线程提交到PicassoExecutorService中执行,获取对应的future,这样即可。

performCancel(Action action) 

performCancel的逻辑和performSubmit思路大体上相同的,只是其逆序而已,因此我们直接在代码中分析其业务逻辑来看看。

void performCancel(Action action) {
  // 获取对应的key
  String key = action.getKey();
  // 从hunterMap中获取对应的BitmapHunter
  BitmapHunter hunter = hunterMap.get(key);
  // 如果hunter存在,则调用hunter.detach(action)和hunter.cancel()删除任务
  if (hunter != null) {
    // 将对应的action从hunter当中删除
    hunter.detach(action);
    // 如果返回true,那么以为这已经没有对应的actions,并且future也通过cancel()函数取消掉了
    if (hunter.cancel()) {
      // 该BitmapHunter不会用到了,删除
      hunterMap.remove(key);
      if (action.getPicasso().loggingEnabled) {
        log(OWNER_DISPATCHER, VERB_CANCELED, action.getRequest().logId());
      }
    }
  }

  // 如果暂停的pasusedTags中还有对应的action
  if (pausedTags.contains(action.getTag())) {
    // 从队列中删除
    pausedActions.remove(action.getTarget());
    if (action.getPicasso().loggingEnabled) {
      log(OWNER_DISPATCHER, VERB_CANCELED, action.getRequest().logId(),
          "because paused request got canceled");
    }
  }

  // 从失败列表中删除对应的action
  Action remove = failedActions.remove(action.getTarget());
  if (remove != null && remove.getPicasso().loggingEnabled) {
    log(OWNER_DISPATCHER, VERB_CANCELED, remove.getRequest().logId(), "from replaying");
  }
}

上面是performCancel(Action action)的代码,其逻辑也比较简单,就是各种队列中删除对应的action信息。

performPauseTag(Object tag)

void performPauseTag(Object tag) {
  // Trying to pause a tag that is already paused.
  // 如果返回true,说明添加成功,set已经修改,如果false,说明该tag已经在pasuedTags中,也就是说,任务已经暂停过了。
  if (!pausedTags.add(tag)) {
    return;
  }

  // Go through all active hunters and detach/pause the requests
  // that have the paused tag.
  // 遍历全部正在执行的hunter们,如果BitmapHunter对应的Action有相应的pausedTag,那么就暂定对应的BitmapHunter
  for (Iterator<BitmapHunter> it = hunterMap.values().iterator(); it.hasNext();) {
    BitmapHunter hunter = it.next();
    boolean loggingEnabled = hunter.getPicasso().loggingEnabled;

    // 获取hunter对应的action,已经后来detached的actions。
    Action single = hunter.getAction();
    List<Action> joined = hunter.getActions();
    // 如果joined不为空,并且有元素,那么说明该BitmapHunter拥有多个Action同时附件在上面
    boolean hasMultiple = joined != null && !joined.isEmpty();

    // Hunter has no requests, bail early. 此处说明BitmapHunter没有请求。
    if (single == null && !hasMultiple) {
      continue;
    }

    // 如果下面判断为真,说明需要暂停对应的Action
    if (single != null && single.getTag().equals(tag)) {
      // 从hunter中detach该Action
      hunter.detach(single);
      // 将该Action放入pausedActions队列当中
      pausedActions.put(single.getTarget(), single);
      if (loggingEnabled) {
        log(OWNER_DISPATCHER, VERB_PAUSED, single.request.logId(),
            "because tag '" + tag + "' was paused");
      }
    }
    
    // 如果BitmapHunter还有附加的其他的Actions,那么还需要依次检查他们。
    if (hasMultiple) {
      for (int i = joined.size() - 1; i >= 0; i--) {
        Action action = joined.get(i);
        if (!action.getTag().equals(tag)) {
          continue;
        }

        hunter.detach(action);
        pausedActions.put(action.getTarget(), action);
        if (loggingEnabled) {
          log(OWNER_DISPATCHER, VERB_PAUSED, action.request.logId(),
              "because tag '" + tag + "' was paused");
        }
      }
    }

    // Check if the hunter can be cancelled in case all its requests
    // had the tag being paused here.
    if (hunter.cancel()) {
      it.remove();
      if (loggingEnabled) {
        log(OWNER_DISPATCHER, VERB_CANCELED, getLogIdsForHunter(hunter), "all actions paused");
      }
    }
  }
}

有个地方比较有意思,需要仔细考虑的:此处的tag并不是之前我们说的cacheKey,cacheKey一般是根据图片的网址或者大小而设定的,一般用于缓存等情况;而此处的tag则一般是用来暂定恢复任务,那什么时候要暂定或者恢复任务呢? 我进入一个Activity,要使用Picasso显示一个图片,但我希望在Activity退出的时候,该图片只是暂停,因为我后面很快还要用到,那这个时候就应该使用恢复和暂停,那Tag呢?这种情况下使用Activity的名字作为tag就完全可以。

 performResumeTag(Object tag)

void performResumeTag(Object tag) {
  // Trying to resume a tag that is not paused.
  // 尝试恢复tag对应的暂停的请求
  if (!pausedTags.remove(tag)) {
    return;
  }

  // 将同一个tag对应的Action放在一个List当中
  List<Action> batch = null;
  for (Iterator<Action> i = pausedActions.values().iterator(); i.hasNext();) {
    Action action = i.next();
    if (action.getTag().equals(tag)) {
      if (batch == null) {
        batch = new ArrayList<Action>();
      }
      batch.add(action);
      i.remove();
    }
  }

  if (batch != null) {
    // 通知主线程进行处理
    mainThreadHandler.sendMessage(mainThreadHandler.obtainMessage(REQUEST_BATCH_RESUME, batch));
  }
}

执行恢复任务的处理函数,其实这段代码比较简单,就做了两件事情:

  1. 从pausedActions中找到所有的tag对应的Action,将其放入到batch的List当中。
  2. 该batch通过主线程的Handler发送到主线程上去处理。

这个和其他的就明显不一样啊,其他的任务处理函数,一般在DispatcherThread线程上就处理完成了,那这个为什么要发送到主线程上去处理呢?还是先去看看再主线程的HANDLER上,接收到这个Message之后,到底做了什么工作?相关的代码片段:

  case REQUEST_BATCH_RESUME:
      //请求被恢复了,还不太懂其业务逻辑。
    @SuppressWarnings("unchecked") List<Action> batch = (List<Action>) msg.obj;
    for (int i = 0, n = batch.size(); i < n; i++) {
      Action action = batch.get(i);
      action.picasso.resumeAction(action);
    }
    break;

之前的时候还不清楚这个是怎么的逻辑,现在清晰一些,其实是在主线程上,依次通过Picasso中的resumeAction(action)函数将其唤醒。那resumeAction(action)又做了什么工作呢? 在Picasso中我们有分析过这段代码的作用,我们现在再回过头来看一下。

void resumeAction(Action action) {
  Bitmap bitmap = null;
  if (!action.skipCache) {
    bitmap = quickMemoryCacheCheck(action.getKey());
  }

  if (bitmap != null) {
    // Resumed action is cached, complete immediately.
    deliverAction(bitmap, MEMORY, action);
    if (loggingEnabled) {
      log(OWNER_MAIN, VERB_COMPLETED, action.request.logId(), "from " + MEMORY);
    }
  } else {
    // Re-submit the action to the executor.
    enqueueAndSubmit(action);
    if (loggingEnabled) {
      log(OWNER_MAIN, VERB_RESUMED, action.request.logId());
    }
  }
}

现在看就简单明了,同样的两步工作:

  1. 尝试从Cache中获取Bitmap,如果有,则直接通过deliverAction(bitmap, MEMORY, action)派发相应。
  2. 如果没有,那么说明需要重新执行请求,将action添加到队列当中,并提交。

performRetry(BitmapHunter hunter)

void performRetry(BitmapHunter hunter) {
  if (hunter.isCancelled()) return;

  if (service.isShutdown()) {
    performError(hunter, false);
    return;
  }

  // 检查网络状态
  NetworkInfo networkInfo = null;
  if (scansNetworkChanges) {
    ConnectivityManager connectivityManager = getService(context, CONNECTIVITY_SERVICE);
    networkInfo = connectivityManager.getActiveNetworkInfo();
  }

  // 是否有网络连接
  boolean hasConnectivity = networkInfo != null && networkInfo.isConnected();
  // 根据当前的网络状态和BitmapHunter的重试次数,来判断是否需要重试
  boolean shouldRetryHunter = hunter.shouldRetry(airplaneMode, networkInfo);
  // BitmapHunter中保存的Action对应的RequestHandler是否支持重试?
  boolean supportsReplay = hunter.supportsReplay();

  if (!shouldRetryHunter) {
    // Mark for replay only if we observe network info changes and support replay.
    // 如果BitmapHunter的重试次数已经用完,还是要根据当前的网络状态看看是否还让其重新尝试
    boolean willReplay = scansNetworkChanges && supportsReplay;
    // 通知执行performError
    performError(hunter, willReplay);
    // 如果网络状态变化,并且RequestHandler支持重试,那么就重新尝试
    if (willReplay) {
      // 标记以重新尝试 ,调用markForReplay(hunter),实际上是将对应的Action放入到failedActions当中。
      markForReplay(hunter);
    }
    return;
  }

  // If we don't scan for network changes (missing permission) or if we have connectivity, retry.
  // 此时对于BitmapHunter来说,支持网络重试,如果受权限限制无法检查网络状态,或者是网络连接,那么直接重试。
  if (!scansNetworkChanges || hasConnectivity) {
    if (hunter.getPicasso().loggingEnabled) {
      log(OWNER_DISPATCHER, VERB_RETRYING, getLogIdsForHunter(hunter));
    }
    // 向service提交该hunter。
    hunter.future = service.submit(hunter);
    return;
  }

  performError(hunter, supportsReplay);

  // 如果支持重试,调用markForReplay(hunter),实际上是将对应的Action放入到failedActions当中。
  if (supportsReplay) {
    markForReplay(hunter);
  }
}

这里面主要做了以下的事情:

  1. 检查对应的BtimapHunter是否还有重试机会,如果有,那么如果网络又畅通,则直接将对应的BitmapHunter提交到service当中
  2. 如果BitmapHunter没有重试机会,已经用完了,那么就检查下对应的Handler是否支持重试,如果可以的话,则将其放入到failedActions队列当中。

 

performComplete(BitmapHunter hunter) && performBatchComplete()

当BitmapHunter执行完成之后,会给Dispatcher的Handler发送Message,Handler的处理函数会调用performComplete(BitmapHunter hunter)函数进行处理。其代码如下:

void performComplete(BitmapHunter hunter) {
  // 是否要跳过Memory Cache
  if (!hunter.shouldSkipMemoryCache()) {
    // 不跳过的话,则将hunter的key和结果放入Cache当中Result为对应的图像
    cache.set(hunter.getKey(), hunter.getResult());
  }
  // 从hunterMap中删除该hunter
  hunterMap.remove(hunter.getKey());
  // 打包,看说明是将hunter放到batch的list当中,并检查如果没有HUNTER_DELAY_NEXT_BATCH,则200ms之后,发送该Message
  // handler处理该Message,其实实际上直接调用performBatchComplete()
  batch(hunter);
  if (hunter.getPicasso().loggingEnabled) {
    log(OWNER_DISPATCHER, VERB_BATCHED, getLogIdsForHunter(hunter), "for completion");
  }
}

void performBatchComplete() {
  // 打包处理,同时取出batch所有的BitmapHunter,并发送给主线程Handler进行处理。
  List<BitmapHunter> copy = new ArrayList<BitmapHunter>(batch);
  batch.clear();
  mainThreadHandler.sendMessage(mainThreadHandler.obtainMessage(HUNTER_BATCH_COMPLETE, copy));
  logBatch(copy);
}

Picasso对BitmapHunter执行的任务结果的分发采用了打包的机制进行处理,换句话说,所有的BitmapHunter执行完成之后,会调用performComplete(BitmapHunter hunter),在该方法中,Picasso将hunter放入batch列表中,然后在Handler中的Messages中是否有HUNTER_DELAY_NEXT_BATCH等待执行,如果没有,那么则等200ms之后再分发,而此时hunter已经加入到batch当中。200ms之后,Handler接收到HUNTER_DELAY_NEXT_BATCH的Message,则将batch中所有的BitmapHunter的结果一起进行处理。

我们接着看看主线程的HANDLER接收到HUNTER_BATCH_COMPLETE的消息的时候,是如何处理的:

  case HUNTER_BATCH_COMPLETE: {
            //此处应该是和Volley一样,打包处理的图形库,因为这些图片可能地址以及大小都是一样的所有打包在一起进行处理。
    @SuppressWarnings("unchecked") List<BitmapHunter> batch = (List<BitmapHunter>) msg.obj;
    //noinspection ForLoopReplaceableByForEach
    //依次进行分发
    for (int i = 0, n = batch.size(); i < n; i++) {
      BitmapHunter hunter = batch.get(i);
      hunter.picasso.complete(hunter);
    }
    break;

其实就是依次调用了Picasso.complete(BitmapHunter)方法,继续查看,则是调用了Action的

void complete(Bitmap result, Picasso.LoadedFrom from);

好了,到现在位置,Dispatcher我们也学习完毕了,下一篇文章中,会对Picasso的整体框架做一个梳理。

 

Tips:

Volley的访问如果不成功的话,是有惩罚系数的,但Picasso默认现在看可能只是设置了RETRY_DELAY=500;BATCH_DELAY=200;

About: happyhls


发表评论

电子邮件地址不会被公开。 必填项已用*标注