微软关于的介绍很简单,其实CancellationTokenSource的使用也很简单,但是实现就不是那么简单了,我们首先来看看CancellationTokenSource的实现:
public class CancellationTokenSource : IDisposable{ private const int CANNOT_BE_CANCELED = 0; private const int NOT_CANCELED = 1; private const int NOTIFYING = 2; private const int NOTIFYINGCOMPLETE = 3; private volatile int m_state; private static readonly Action
CancellationTokenSource的实现相对比较复杂,我们首先看看CancellationTokenSource的构造函数,默认构造函数将会设置【m_state = NOT_CANCELED】,我们也可以构造一个特定时间后就自动Cancel的CancellationTokenSource,自动Cancel是依赖一个Timer实例,在Timer到指定时间后调用CancellationTokenSource的Cancel方法【这里是在TimerCallbackLogic里面调用Cancel方法】,CancelAfter方法的实现也是依赖这个Timer实例和TimerCallbackLogic方法。
现在我们来看看CancellationTokenSource最主要的一个方法Cancel,Cancel方法调用NotifyCancellation方法,NotifyCancellation方法主要调用ExecuteCallbackHandlers【从这个方法的名称可以猜测到主要是调用回调方法】,在ExecuteCallbackHandlers方法里面用到一个变量m_registeredCallbacksLists,它是SparselyPopulatedArray<CancellationCallbackInfo>[]结构,【可以理解为是一个链表的数组,数组每个元素时一个链表,链表里面的每个节点都可以访问下一个节点】,我们遍历这个链表数组的每一个节点,检查节点是否有值,即m_executingCallback != null,然后调用回调方法,如果回调方法的TargetSyncContext不为空,调用CancellationCallbackCoreWork_OnSyncContext方法,否者调用CancellationCallbackCoreWork方法【CancellationCallbackCoreWork_OnSyncContext里面也是调用它】,CancellationCallbackCoreWork方法是调用CancellationCallbackInfo的ExecuteCallback。
CancellationTokenSource有两个CreateLinkedTokenSource方法【可以理解为创建于当前的CreateLinkedTokenSource相关联的CreateLinkedTokenSource】,期主要实现是CancellationToken的Register方法。
public struct CancellationToken{ private CancellationTokenSource m_source; internal CancellationToken(CancellationTokenSource source) { m_source = source; } public CancellationToken(bool canceled) :this() { if(canceled) m_source = CancellationTokenSource.InternalGetStaticSource(canceled); } public CancellationTokenRegistration Register(Action callback) { if (callback == null) throw new ArgumentNullException("callback"); return Register(s_ActionToActionObjShunt,callback,false,true); } public CancellationTokenRegistration Register(Action callback, bool useSynchronizationContext) { if (callback == null) throw new ArgumentNullException("callback"); return Register(s_ActionToActionObjShunt,callback,useSynchronizationContext,true); } public CancellationTokenRegistration Register(Actioncallback, Object state) { if (callback == null) throw new ArgumentNullException("callback"); return Register(callback,state,false,true); } /// Registers a delegate that will be called when this CancellationToken is canceled. public CancellationTokenRegistration Register(Action callback, Object state, bool useSynchronizationContext) { return Register(callback,state,useSynchronizationContext,true); } private CancellationTokenRegistration Register(Action callback, Object state, bool useSynchronizationContext, bool useExecutionContext) { StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; if (callback == null) throw new ArgumentNullException("callback"); if (CanBeCanceled == false) { return new CancellationTokenRegistration(); // nothing to do for tokens than can never reach the canceled state. Give them a dummy registration. } SynchronizationContext capturedSyncContext = null; ExecutionContext capturedExecutionContext = null; if (!IsCancellationRequested) { if (useSynchronizationContext) capturedSyncContext = SynchronizationContext.Current; if (useExecutionContext) capturedExecutionContext = ExecutionContext.Capture(ref stackMark, ExecutionContext.CaptureOptions.OptimizeDefaultCase); } // Register the callback with the source. return m_source.InternalRegister(callback, state, capturedSyncContext, capturedExecutionContext); } private readonly static Action s_ActionToActionObjShunt = new Action (ActionToActionObjShunt); private static void ActionToActionObjShunt(object obj) { Action action = obj as Action; Contract.Assert(action != null, "Expected an Action here"); action(); } public static CancellationToken None { get { return default(CancellationToken); } } public bool IsCancellationRequested { get { return m_source != null && m_source.IsCancellationRequested; } } public bool CanBeCanceled { get { return m_source != null && m_source.CanBeCanceled; } } public void ThrowIfCancellationRequested() { if (IsCancellationRequested) ThrowOperationCanceledException(); } private void ThrowOperationCanceledException() { throw new OperationCanceledException(Environment.GetResourceString("OperationCanceled"), this); }}
CancellationToken的很多属性都是来源于CancellationTokenSource的属性,CancellationToken的主要方法 Register 也是嗲用CancellationTokenSource的InternalRegister方法。InternalRegister方法检查当前是否发起了Cancel【IsCancellationRequested】,如果是直接调用回调方法callback(stateForCallback);,否者把回调方法包装成CancellationCallbackInfo实例,然后添加到m_registeredCallbacksLists对象中,然后在返回CancellationTokenRegistration实例。
internal class CancellationCallbackInfo { internal readonly ActionCallback; internal readonly object StateForCallback; internal readonly SynchronizationContext TargetSyncContext; internal readonly ExecutionContext TargetExecutionContext; internal readonly CancellationTokenSource CancellationTokenSource; internal CancellationCallbackInfo(Action callback, object stateForCallback, SynchronizationContext targetSyncContext, ExecutionContext targetExecutionContext,CancellationTokenSource cancellationTokenSource) { Callback = callback; StateForCallback = stateForCallback; TargetSyncContext = targetSyncContext; TargetExecutionContext = targetExecutionContext; CancellationTokenSource = cancellationTokenSource; } private static ContextCallback s_executionContextCallback; internal void ExecuteCallback() { if (TargetExecutionContext != null) { var callback = s_executionContextCallback; if (callback == null) s_executionContextCallback = callback = new ContextCallback(ExecutionContextCallback); ExecutionContext.Run(TargetExecutionContext, callback, this); } else { ExecutionContextCallback(this); } } private static void ExecutionContextCallback(object obj) { CancellationCallbackInfo callbackInfo = obj as CancellationCallbackInfo; Contract.Assert(callbackInfo != null); callbackInfo.Callback(callbackInfo.StateForCallback); } } internal class SparselyPopulatedArray where T : class { private readonly SparselyPopulatedArrayFragment m_head; private volatile SparselyPopulatedArrayFragment m_tail; internal SparselyPopulatedArray(int initialSize) { m_head = m_tail = new SparselyPopulatedArrayFragment (initialSize); } internal SparselyPopulatedArrayFragment Tail { get { return m_tail; } } internal SparselyPopulatedArrayAddInfo Add(T element) { while (true) { // Get the tail, and ensure it's up to date. SparselyPopulatedArrayFragment tail = m_tail; while (tail.m_next != null) m_tail = (tail = tail.m_next); // Search for a free index, starting from the tail. SparselyPopulatedArrayFragment curr = tail; while (curr != null) { const int RE_SEARCH_THRESHOLD = -10; // Every 10 skips, force a search. if (curr.m_freeCount < 1) --curr.m_freeCount; if (curr.m_freeCount > 0 || curr.m_freeCount < RE_SEARCH_THRESHOLD) { int c = curr.Length; int start = ((c - curr.m_freeCount) % c); if (start < 0) { start = 0; curr.m_freeCount--; // Too many free elements; fix up. } Contract.Assert(start >= 0 && start < c, "start is outside of bounds"); // Now walk the array until we find a free slot (or reach the end). for (int i = 0; i < c; i++) { // If the slot is null, try to CAS our element into it. int tryIndex = (start + i) % c; Contract.Assert(tryIndex >= 0 && tryIndex < curr.m_elements.Length, "tryIndex is outside of bounds"); if (curr.m_elements[tryIndex] == null && Interlocked.CompareExchange(ref curr.m_elements[tryIndex], element, null) == null) { int newFreeCount = curr.m_freeCount - 1; curr.m_freeCount = newFreeCount > 0 ? newFreeCount : 0; return new SparselyPopulatedArrayAddInfo (curr, tryIndex); } } } curr = curr.m_prev; } // If we got here, we need to add a new chunk to the tail and try again. SparselyPopulatedArrayFragment newTail = new SparselyPopulatedArrayFragment ( tail.m_elements.Length == 4096 ? 4096 : tail.m_elements.Length * 2, tail); if (Interlocked.CompareExchange(ref tail.m_next, newTail, null) == null) { m_tail = newTail; } } } } internal struct SparselyPopulatedArrayAddInfo where T : class { private SparselyPopulatedArrayFragment m_source; private int m_index; internal SparselyPopulatedArrayAddInfo(SparselyPopulatedArrayFragment source, int index) { Contract.Assert(source != null); Contract.Assert(index >= 0 && index < source.Length); m_source = source; m_index = index; } internal SparselyPopulatedArrayFragment Source { get { return m_source; } } internal int Index { get { return m_index; } } } internal class SparselyPopulatedArrayFragment where T : class { internal readonly T[] m_elements; // The contents, sparsely populated (with nulls). internal volatile int m_freeCount; // A hint of the number of free elements. internal volatile SparselyPopulatedArrayFragment m_next; // The next fragment in the chain. internal volatile SparselyPopulatedArrayFragment m_prev; // The previous fragment in the chain. internal SparselyPopulatedArrayFragment(int size) : this(size, null) { } internal SparselyPopulatedArrayFragment(int size, SparselyPopulatedArrayFragment prev) { m_elements = new T[size]; m_freeCount = size; m_prev = prev; } internal T this[int index] { get { return Volatile.Read (ref m_elements[index]); } } internal int Length { get { return m_elements.Length; } } internal SparselyPopulatedArrayFragment Prev { get { return m_prev; } } internal T SafeAtomicRemove(int index, T expectedElement) { T prevailingValue = Interlocked.CompareExchange(ref m_elements[index], null, expectedElement); if (prevailingValue != null) ++m_freeCount; return prevailingValue; } }
回头看CancellationCallbackInfo的实现也很简单。