Volley源代码分析 – 2:任务的执行,CacheDispatcher、NetworkDispatcher、ResponseDelivery

首先,重温Volley框架图:

volley-arch

我们所有的代码都是围绕这个框架来分析。所以为什么现在来看看CacheDispatcher和NetworkDispatcher?

  • 从上面的框架图可以看出,大部分任务的执行都是通过CacheDispatcher和NetworkDispatcher来分发的。
  • 从RequestQueue的代码入手,启动了RequestQueue之后(调用RequestQueue的start()方法),主要做了两件事情,启动1个CacheDispatcher线程,启动多个NetworkDispatcher线程。

这里是RequestQueue的start()代码:

    public void start() {
        stop();  // Make sure any currently running dispatchers are stopped. 确保当前正在运行的调度器已经停止。
        // Create the cache dispatcher and start it. 创建调度器,并且启动它
        mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
        mCacheDispatcher.start();

        // Create network dispatchers (and corresponding threads) up to the pool size.
        // 创建网络的调度器,最高启动线程池大小个调度器。
        for (int i = 0; i < mDispatchers.length; i++) {
            NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
                    mCache, mDelivery);
            mDispatchers[i] = networkDispatcher;
            networkDispatcher.start();
        }
    }

所以说,从哪个角度讲,都应该分析CacheDispatcher和NetworkDispatcher。

CacheDispatcher

刚刚看RequestQueue的start()代码,看到mCacheDispatcher.start()的调用,其实就应该想到,这必然是个线程啊,当然,从一开始框架图更应该看出来。所以初始化之类的我们不需要关心,只要看一下其run()方法里面,都做了一些什么样的工作?

    @Override
    public void run() {
        if (DEBUG) VolleyLog.v("start new dispatcher");
        //设定优先级会后台优先级。
        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);

        // Make a blocking call to initialize the cache.
        // 阻塞调用,初始化cache
        mCache.initialize();

        while (true) {
            try {
                // Get a request from the cache triage queue, blocking until
                // at least one is available.
                // 从cache中获取一个待分类的cache,采用了blocking的队列,会一直阻塞直到有可用的位置。
                final Request<?> request = mCacheQueue.take();
                request.addMarker("cache-queue-take");

                // If the request has been canceled, don't bother dispatching it.
                // 检查请求是否已经被取消,如果是的话,则放弃该请求,重新获取新的进行调度。
                if (request.isCanceled()) {
                    request.finish("cache-discard-canceled");
                    continue;
                }

                // Attempt to retrieve this item from cache.
                // 尝试从Cache中获取数据
                Cache.Entry entry = mCache.get(request.getCacheKey());
                if (entry == null) {
                    // 从Cache中获取数据失败,意味着cache miss,需要从网络中重新获取
                    request.addMarker("cache-miss");
                    // Cache miss; send off to the network dispatcher.
                    // 将请求添加到网络任务队列中。
                    mNetworkQueue.put(request);
                    continue;
                }

                // If it is completely expired, just send it to the network.
                // 此处和上面情况类似,但是不是cache缺失,而是cache过期
                if (entry.isExpired()) {
                    request.addMarker("cache-hit-expired");
                    // 更新之前,首先将数据保存一份。
                    request.setCacheEntry(entry);
                    // 将Request添加到网络任务队列当中。
                    mNetworkQueue.put(request);
                    continue;
                }

                // We have a cache hit; parse its data for delivery back to the request.
                // 到此处已经检查过Cache是否确实,Cache是否过期,此时说明数据从Cache中取回即可。
                request.addMarker("cache-hit");
                // 首先将cache中的raw数据进行解析。
                Response<?> response = request.parseNetworkResponse(
                        new NetworkResponse(entry.data, entry.responseHeaders));
                request.addMarker("cache-hit-parsed");

                if (!entry.refreshNeeded()) {
                    // Completely unexpired cache hit. Just deliver the response.
                    // 此时数据不需要更新,直接将数据分发出去。
                    mDelivery.postResponse(request, response);
                } else {
                    // Soft-expired cache hit. We can deliver the cached response,
                    // but we need to also send the request to the network for
                    // refreshing.
                    // 此时数据虽然说是cache命中了,但数据需要进行更新。
                    request.addMarker("cache-hit-refresh-needed");
                    request.setCacheEntry(entry);

                    // Mark the response as intermediate.
                    // 标记该response为一个中间结果,以后还会需要更新
                    response.intermediate = true;

                    // Post the intermediate response back to the user and have
                    // the delivery then forward the request along to the network.
                    // 将中间结果返回给用户,并且将请求转发给网络层。
                    // 但不清楚为什么要将添加到网络队列的过程放在其他线程中去做-->查看该方法的签名,该方法是首先将结果传递给用户,然后再执行Runnable
                    mDelivery.postResponse(request, response, new Runnable() {
                        @Override
                        public void run() {
                            try {
                                mNetworkQueue.put(request);
                            } catch (InterruptedException e) {
                                // Not much we can do about this.
                            }
                        }
                    });
                }

            } catch (InterruptedException e) {
                // We may have been interrupted because it was time to quit.
                if (mQuit) {
                    return;
                }
                continue;
            }
        }
    }

和前面的文章一样,我们还是先来画一张流程图来分析一下:

CacheDispatcher.run

 

这一部分就是在CacheDispatcher的线程中循环执行的部分,我们可以看到Cachedispatcher主要做了以下的工作:

  1. 从阻塞队列中取出一个需要处理的Request,如果没有,则一直等待。
  2. 取出之后,首先判断该Request是否已经被取消了,如果是的话,则调用Request.finish()方法取消相应的Request。并重新获取下一个需要执行的Request。
  3. 尝试从cache中获取cacheKey对应的entry。如果entry是null,则没有取到entry,需要放到网络任务中执行。
  4. 此时entry必然是有数据的,再一步判断entry是否在有效期,如果已经过期,则保留一份数据,然后提交到网络的任务队列当中执行。
  5. 此时entry是有效的而且没有过期,所以对结果进行解析。
  6. 根据entry中的数据判断数据是否需要刷新了,有的时候,这些数据现在还是有效,但可能过很短的时间就过期,那么在后面的操作中就记得需要刷新这些数据。
  7. 如果不需要刷新,就直接通过mDelivery分发结果
  8. 如果需要刷新,则设置Cache的数据备份,然后设置需要刷新的标签
  9. 调用postReqonse中带有Runnalbe参数的那个方法,首先将结果进行分发,分发完成之后则将该任务添加到网络的任务队列当中。

总结就是,首先尝试获取Cache对应的数据,如果没有数据,则扔到mNetworkQueue中等待执行,有的话,则判断是否过期,是否后续需要刷新,再分发结果,或者继续放进mNetworkQueue中即可。

NetworkDispatcher

同样的办法,我们继续分析一下NetworkDispatcher的工作原理,首先上源代码:

 

    public void run() {
        //设置进程的优先级为后台进程。
        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
        while (true) {
            Request<?> request;
            try {
                // Take a request from the queue.
                // 从任务队列中获取一个请求
                request = mQueue.take();
            } catch (InterruptedException e) {
                // We may have been interrupted because it was time to quit.
                if (mQuit) {
                    return;
                }
                continue;
            }

            try {
                //添加日志
                request.addMarker("network-queue-take");

                // If the request was cancelled already, do not perform the
                // network request.
                // 如果该请求已经被取消,那么就不再处理
                if (request.isCanceled()) {
                    request.finish("network-discard-cancelled");
                    continue;
                }
                //添加流量统计信息标记。
                addTrafficStatsTag(request);

                // Perform the network request.
                // 从网络中获取请求数据
                NetworkResponse networkResponse = mNetwork.performRequest(request);
                request.addMarker("network-http-complete");

                // If the server returned 304 AND we delivered a response already,
                // we're done -- don't deliver a second identical response.
                // 如果服务器已经返回了304,并且我们已经进行过同样的请求,那么就不需要再处理
                // HTTP 304 =  如果客户端发送了一个带条件的 GET 请求且该请求已被允许,而文档的内容(自上次访问以来或者根据请求的条件)并没有改变,则服务器应当返回这个状态码。
                if (networkResponse.notModified && request.hasHadResponseDelivered()) {
                    request.finish("not-modified");
                    continue;
                }

                // Parse the response here on the worker thread.
                // 在工作线程中,对网络请求返回的数据进行解析
                Response<?> response = request.parseNetworkResponse(networkResponse);
                request.addMarker("network-parse-complete");

                // Write to cache if applicable.
                // TODO: Only update cache metadata instead of entire record for 304s.
                // 如果需要,则写入cache。注意,对于304,则只写入元数据,而非整个全部数据
                if (request.shouldCache() && response.cacheEntry != null) {
                    mCache.put(request.getCacheKey(), response.cacheEntry);
                    request.addMarker("network-cache-written");
                }

                // Post the response back.
                // 标记该请求已经分发过。
                request.markDelivered();
                // 将解析的结果分发给main线程。
                mDelivery.postResponse(request, response);
            } catch (VolleyError volleyError) {
                parseAndDeliverNetworkError(request, volleyError);
            } catch (Exception e) {
                VolleyLog.e(e, "Unhandled exception %s", e.toString());
                mDelivery.postError(request, new VolleyError(e));
            }
        }
    }

同样的,我们也画一张流程图来分析一下NetworkDispatcher都做了什么事情。

NetworkDispatcher.run

从上图我们可以看出,NetworkDispatcher的run()方法中所做的事情和CacheDispatcher所做的事情基本上是类似的。

  1. 首先从阻塞队列中获取一个需要处理的Request。如果没有可以处理的Request,则阻塞等待。
  2. 判断这个request是否已经被取消了,如果取消了,则调用request的finish()方法。
  3. 添加流量统计
  4. 通过网络访问,获取网络访问的返回结果
  5. 如果说数据没有修改,而且数据结果已经被分发过,那么就可以终止该任务。
  6. 新的数据,那么需要解析数据
  7. 判断是否需要cache,如果需要,则写入cache,如果不需要则跳过。
  8. 标记该请求已经分发过,同时通过mDevliery派发结果。

ResponseDelivery

通过分析上面的代码,我们可以发现,所有的操作的最后,都要讲Response交给mDelivery的postResponse方法当中,也就是要将结果分发出去。那么具体是怎么实现的呢?

ResponseDelivery的源代码如下:

public interface ResponseDelivery {
    /**
     * Parses a response from the network or cache and delivers it.
     * 
     * 解析从网络或者cache获取的相应,然后派发结果
     */
    public void postResponse(Request<?> request, Response<?> response);

    /**
     * Parses a response from the network or cache and delivers it. The provided
     * Runnable will be executed after delivery.
     * 
     * 解析从网络或者cache获取的相应,然后派发结果,提供的Runnable将在派发之后执行。
     */
    public void postResponse(Request<?> request, Response<?> response, Runnable runnable);

    /**
     * Posts an error for the given request.
     * 
     * 对给定的request返回error。
     */
    public void postError(Request<?> request, VolleyError error);
}

从上我们可以看出,ResponseDelivery是一个接口,其中共有三个方法签名,实际上前两种是做的同一件事情,只是签名不同而已。在Volley中,有一个默认的实现,基本上我们也不会去修改它,就是ExecutorDelivery。

该方法共有两个构造函数,分别是

    public ExecutorDelivery(final Handler handler) {
        // Make an Executor that just wraps the handler.
        mResponsePoster = new Executor() {
            @Override
            public void execute(Runnable command) {
                //默认情况下,这段代码的调用位于调用者的线程之上。
                handler.post(command);
            }
        };
    }

    public ExecutorDelivery(Executor executor) {
        mResponsePoster = executor;
    }

可以看出,一共有两个构造函数,一个其参数为Executor,即调用者传递给他一个executor,另外一种是传递一个handler进来,然后在构造函数中定义一个继承了Executor的匿名类,在execute(Runnable command)方法中,将command发送给handler到对应的线程上去处理。那么这个线程是什么呢?

我们跟踪代码,发现在Volley内部,只有对第一个构造函数,即

public ExecutorDelivery(final Handler handler);

是已经使用了的,在RequestQueue的构造函数当中:

    public RequestQueue(Cache cache, Network network, int threadPoolSize) {
        this(cache, network, threadPoolSize,
                new ExecutorDelivery(new Handler(Looper.getMainLooper())));
    }

在RequestQueue当中,实例化了一个ExecutorDelivery,需要注意的是,此时传入ExecutorDelivery构造函数所对应的Looper为Looper.getMainLooper(),也就是说,handler发送的消息都是在主线程即UI线程上进行处理的。这也就可以解释了之前我们看Android当中Volley的文档,其中一直在强调,所有的结果的分发都是在主线程上完成的。

我们继续查看源代码,看看ExecutorDelivery是怎么样来分发结果的。其实现的ReuestDelivery的接口的实现为:

    @Override
    public void postResponse(Request<?> request, Response<?> response) {
        postResponse(request, response, null);
    }

    @Override
    public void postResponse(Request<?> request, Response<?> response, Runnable runnable) {
        request.markDelivered();
        request.addMarker("post-response");
        mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, runnable));
    }

    @Override
    public void postError(Request<?> request, VolleyError error) {
        request.addMarker("post-error");
        Response<?> response = Response.error(error);
        mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, null));
    }

可以发现其主要做了两件事情:

  1. 调用request.markDelivered()来标记该request已经分发过了。
  2. 将一个Runnalbe对象实际上交给了主线程的handler去执行。

因此我们继续看看这个Runnable都做了哪些工作,该Runnable的代码如下:

    @SuppressWarnings("rawtypes")
    private class ResponseDeliveryRunnable implements Runnable {
        private final Request mRequest;
        private final Response mResponse;
        private final Runnable mRunnable;

        public ResponseDeliveryRunnable(Request request, Response response, Runnable runnable) {
            mRequest = request;
            mResponse = response;
            mRunnable = runnable;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void run() {
            // If this request has canceled, finish it and don't deliver.
            // 如果request已经被取消,那么就不需要派发结果。
            if (mRequest.isCanceled()) {
                mRequest.finish("canceled-at-delivery");
                return;
            }

            // Deliver a normal response or error, depending.
            // 如果Response解析成功,那么将任务派发给Request。
            if (mResponse.isSuccess()) {
                mRequest.deliverResponse(mResponse.result);
            } else {
                mRequest.deliverError(mResponse.error);
            }

            // If this is an intermediate response, add a marker, otherwise we're done
            // and the request can be finished.
            // 如果是中间结果,那么就添加标记,否则则调用finish命令,结束该任务。
            if (mResponse.intermediate) {
                mRequest.addMarker("intermediate-response");
            } else {
                mRequest.finish("done");
            }

            // If we have been provided a post-delivery runnable, run it.
            // 此时任务已经派发完成,如果传入了其他的Runnable,那么就调用该Runnable。
            if (mRunnable != null) {
                mRunnable.run();
            }
       }
    }

代码比较简单,也不需要去画什么流程图了。

  1. 检查该mRequest是否已经被取消了,如果已经取消了,那么就调用Request的finish()方法,进而调用RequestQueue的finish()方法删除该request。
  2. 根据该mRequest的执行结果,即mResponse.isSuccess()结果来调用相应的mRequest.deliverResponse()或者mRequest.deliverError()方法。
  3. 检查该mRequest对应的执行结果mResponse.intermeidate来判断该任务是否还有后面可能存在的更新,因为这时候标示mResponse的结果是暂时的,需要更新的。如果不需要更新的话那么,那么调用mRequst.finish()方法来结束该Request。
  4. 以上工作完成之后,如果还设置了需要执行的mRunnable,则调用该Runnable,需要注意的是,此时并不会开辟新的线程去执行其中的代码,而是直接运行在主线程上,所以我们必须要注意该Runnable代码中的工作量,避免ANR。

综上,CacheDispatcher、NetworkDispatcher、ResponseDelivery就分析完了,后面可能会继续分析下Volley其他设计比较巧妙的边边角角的代码。

About: happyhls


发表评论

电子邮件地址不会被公开。 必填项已用*标注