Дополнительные возможности AsyncEnumerator

ОГЛАВЛЕНИЕ

В этой статье я хочу продемонстрировать некоторые дополнительные функции, предоставляемые AsyncEnumerator, такие как соединение между несколькими параллельными асинхронными операциями, поддержку модели асинхронного программирования (Asynchronous Programming Model – APM), возвращаемые значения, управляемые потоком обратного вызова, синхронизированный доступ к общим данным, автоматическое удаление незавершенных операций и поддержка отмены/времени ожидания. Попутно я также разберу для читателей некоторые распространенные шаблоны программирования, ставшие возможными с помощью AsyncEnumerator.

Присоединение к параллельным асинхронным операциям

Одной из положительных сторон выполнения асинхронных операций является то, что несколько из них можно выполнять параллельно, намного улучшая производительность приложения. Например, если три асинхронных запроса веб-служб инициализировать параллельно и если на завершение каждого запроса уходит 5 секунд, то общее время, которое программа проведет в ожидании – всего 5 секунд. С другой стороны, в случае выполнения синхронных запросов веб-служб приложению придется ждать завершения каждого из них перед инициализацией следующего. Так что выполнение трех синхронных запросов веб-служб, каждый из которых занимает 5 секунд, означает, что приложение будет ждать как минимум 15 секунд.

С помощью AsyncEnumerator очень просто инициализировать несколько параллельных асинхронных операций. Код после этого может обработать завершенные операции после завершения их всех, или по мере завершения каждой из них. В итераторе на рис. 1 показаны обе методики обработки завершенных операций. Запустив его, я получил следующий результат (обратите внимание, что необходимо ждать истечения времени веб-запроса):

All the operations completed:
  Uri=http://wintellect.com/    ContentLength=41207
  Uri=http://www.devscovery.com/  ContentLength=13258
  Uri=http://1.1.1.1/ WebException=Unable to connect to remote server
An operation completed:
  Uri=http://wintellect.com/    ContentLength=41208
An operation completed:
  Uri=http://www.devscovery.com/  ContentLength=13258
An operation completed:
  Uri=http://1.1.1.1/  WebException=Unable to connect to remote server

Рис. 1. Координация нескольких асинхронных операций

public static class AsyncEnumeratorPatterns {
  public static void Main() {
   String[] urls = new String[] { 
    "http://Wintellect.com/", 
    "http://1.1.1.1/",  // Demonstrates error recovery
    "http://www.Devscovery.com/" 
   };

   // Demonstrate process
   AsyncEnumerator ae = new AsyncEnumerator();
   ae.Execute(ProcessAllAndEachOps(ae, urls));
  }

  private static IEnumerator<Int32> ProcessAllAndEachOps(
    AsyncEnumerator ae, String[] urls) {
   Int32 numOps = urls.Length;

   // Issue all the asynchronous operation(s) so they run concurrently
   for (Int32 n = 0; n < numOps; n++) {
    WebRequest wr = WebRequest.Create(urls[n]);
    wr.BeginGetResponse(ae.End(), wr);
   }

   // Have AsyncEnumerator wait until ALL operations complete
   yield return numOps;

   Console.WriteLine("All the operations completed:");
   for (Int32 n = 0; n < numOps; n++) {
    ProcessCompletedWebRequest(ae.DequeueAsyncResult());
   }

   Console.WriteLine(); // *** Blank line between demos ***

   // Issue all the asynchronous operation(s) so they run concurrently
   for (Int32 n = 0; n < numOps; n++) {
    WebRequest wr = WebRequest.Create(urls[n]);
    wr.BeginGetResponse(ae.End(), wr);
   }

   for (Int32 n = 0; n < numOps; n++) {
    // Have AsyncEnumerator wait until EACH operation completes
    yield return 1;

    Console.WriteLine("An operation completed:");
    ProcessCompletedWebRequest(ae.DequeueAsyncResult());
   }
  }

  private static void ProcessCompletedWebRequest(IAsyncResult ar) {
   WebRequest wr = (WebRequest)ar.AsyncState;
   try {
    Console.Write("  Uri=" + wr.RequestUri + "  ");
    using (WebResponse response = wr.EndGetResponse(ar)) {
     Console.WriteLine("ContentLength=" + response.ContentLength);
    }
   }
   catch (WebException e) {
    Console.WriteLine("WebException=" + e.Message);
   }
   }
}

Код в начале итератора выдает несколько асинхронных операций и затем исполняет оператор yield return numOps. Этот оператор указывает AsyncEnumerator не проводить обратных вызовов к коду, пока не будет закончено столько операций, сколько указано значением переменной numOps. Код сразу под оператором yield return затем исполняет цикл для обработки всех завершенных операций.

Отметьте, что порядок завершения операций может отличаться от порядка их выдачи. Для соотнесения каждого результата с его запросом я передал переменную wr, которая определяет объект WebRequest, используемый мною для инициации запроса, в последнем аргументе BeginGetResponse. Затем в методе ProcessCompletedWebRequest я извлекаю объект WebRequest, использованный для инициации запроса, из свойства AsyncState объекта IAsyncResult.

Код внизу итератора также выдает несколько асинхронных операций и затем входит в цикл для обработки каждой операции по мере ее завершения. Однако итератор должен сперва дождаться завершения каждой операции. Это выполняется оператором yield return 1, который указывает AsyncEnumerator произвести обратный вызов к коду итератора, как только завершится одна из операций.


Поддержка APM

В моей предыдущей статье я объяснил, как вызов метода Execute AsyncEnumerator начинает исполнение кода итератора. Однако я также объяснил, что поток, вызывающий Execute, будет блокироваться до выхода из итератора или исполнения им оператора yield break.

Блокировка потока может повредить масштабируемости приложения и крайне нежелательна, особенно в серверных приложениях. Она также вредит скорости ответа при вызове потоком графического интерфейса пользователя, поскольку на исполнение итератора уходит неопределенный промежуток времени, и в течение этого времени приложение Windows® Forms или Windows Presentation Foundation (WPF) не будет реагировать на ввод. Вызывать Execute определено стоит только при написании тестового кода или экспериментах с методом итератора.

Для рабочего кода следует вызывать такие методы AsyncEnumerator, как BeginExecute и EndExecute. Внутренне, при вызове BeginExecute, объект AsyncEnumerator конструирует экземпляр класса AsyncResultNoResult, о котором я рассказывал в выпуске журнала MSDN® Magazine за март 2007 года (msdn.microsoft.com/magazine/cc163467). При вызове BeginExecute можно передать ссылку на ваш собственный метод AsyncCallback, и объект AsyncEnumerator вызовет этот метод, когда итератор завершит его исполнение. Затем этот метод должен вызвать метод EndExecute из AsyncEnumerator для получения результатов итератора. Ниже я покажу несколько примеров, в которых я пользуюсь преимуществами методов BeginExecute и EndExecute. Такой метод выглядит следующим образом:

public class AsyncEnumerator<TResult>: AsyncEnumerator {
  public IAsyncResult BeginExecute(
   IEnumerator<Int32> enumerator,
   AsyncCallback callback, Object state);

  public void EndExecute(IAsyncResult result);
}

Также, поскольку AsyncEnumerator поддерживает APM, его можно интегрировать со всеми моделями приложений Microsoft .NET Framework, поскольку они уже поддерживают APM. Это значит, что AsyncEnumerator можно использовать с приложениями веб-форм ASP.NET, веб-службами XML ASP.NET, службами Windows Communication Foundation (WCF), приложениями Windows Forms, приложениями WPF, консольными приложениями, службами Windows и так далее.

Также стоит указать, что поскольку AsyncEnumerator поддерживает APM, его можно использовать внутри другого итератора для компоновки асинхронных операций. Например, можно создать итератор, который знает, как асинхронно выполнить запрос к базе данных и обработать его результат, когда тот появится. Я называю это подпрограммой-итератором. Затем внутри другого итератора можно инициализировать несколько запросов к базе данных, вызвав подпрограмму-итератор в цикле. Для каждой итерации цикла можно создать AsyncEnumerator и вызывать его метод BeginExecute, передавая ему имя подпрограммы-итератора и любые дополнительные аргументы, какие необходимо.

Обратите внимание, что эта модель дает важное преимущество: все подпрограммы-итераторы работают параллельно, не блокируя потоков (если лежащая в основе реализация APM не блокирует потоки, скажем в случае применения BeginXxx для постановки в очередь к пулу потоков ThreadPool делегата, ставящего блок до завершения какой-либо операции). Это позволяет создать простой итератор, включающий в себя одну асинхронную операцию и вызывающий ее изнутри других итераторов, сохраняя при этом масштабируемость и скорость ответа.


Возвращаемые значения

Во многих ситуациях полезно, чтобы итератор возвращал результат по завершении всех его процессов. Однако, итератор не может возвратить значение по завершении, поскольку в итераторе не может быть возвращаемого оператора. А операторы yield return возвращают значение для каждой итерации, не окончательное значение.

Если необходимо, чтобы по завершении обработки итератор возвращал окончательное значение, то можно воспользоваться моим вспомогательным классом AsyncEnumerator<TResult>. Модель этого класса показана здесь:

public class AsyncEnumerator<TResult>: AsyncEnumerator {
  public AsyncEnumerator();
  public TResult Result { get; set; }

  new public TResult Execute(IEnumerator<Int32> enumerator);
  new public TResult EndExecute(IAsyncResult result);
}

Использовать AsyncEnumerator<TResult> довольно просто. Сперва измените код, чтобы он создавал экземпляр AsyncEnumerator<TResult> вместо нормального AsyncEnumerator. Для TResult укажите тип, который итератору следует возвращать в итоге. Далее, измените часть кода, вызывающую метод Execute или EndExecute (который ранее возвращал void), чтобы получить возвращаемое значение, и используйте это значение как угодно.

Далее, измените код итератора, чтобы он принимал AsyncEnumerator<TResult> вместо AsyncEnumerator. Само собой, необходимо указать тот же тип данных для универсального параметра TResult. Наконец, внутри кода итератора установите свойство Result объекта AsyncEnumerator<TResult> на значение, которое должен возвратить итератор.

Чтобы помочь в сведении всего этого вместе, на рис. 2 показан код, реализующий простую асинхронную веб-службу ASP.NET, которая одновременно запрашивает код HTML для нескольких различных веб-узлов (передаваемых как разделенная запятыми строка). После получения всех данных о веб-узлах веб-служба возвращает массив строк, где каждый элемент показывает URL-адрес веб-узла и число байтов, загруженных с веб-узла, либо ошибку, если произошла ошибка.

Рис. 2. Одновременное получение нескольких веб-узлов

[WebService(Namespace = "http://tempuri.org/")]
[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
public class Service : System.Web.Services.WebService {
  private static List<String[]> s_recentRequests = new List<String[]>(10);
  private static SyncGate s_recentRequestsSyncGate = new SyncGate();

  private AsyncEnumerator<String[]> m_webSiteDataLength;

  [WebMethod]
  public IAsyncResult BeginGetWebSiteDataLength(
   String uris, AsyncCallback callback, Object state) {

   // Construct an AsyncEnumerator that will eventually return a String[]
   m_webSiteDataLength = new AsyncEnumerator<String[]>();

   // NOTE: The AsyncEnumerator automatically saves the ASP.NET 
   // SynchronizationContext with it ensuring that the iterator 
   // always executes using the correct IPrincipal, 
   // CurrentCulture, and CurrentUICulture.

   // Initiate the iterator asynchronously. 
   return m_webSiteDataLength.BeginExecute(
    GetWebSiteDataLength(m_webSiteDataLength, uris.Split(',')), 
              callback, state);
   // NOTE: Since the AsyncEnumerator's BeginExecute method returns an 
   // IAsyncResult, we can just return this back to ASP.NET
  }

  private IEnumerator<Int32> GetWebSiteDataLength(
       AsyncEnumerator<String[]> ae, String[] uris) {

   // Issue several web request simultaneously
   foreach (String uri in uris) {
    WebRequest webRequest = WebRequest.Create(uri);
    webRequest.BeginGetResponse(ae.End(), webRequest);
   }

   yield return uris.Length; // Wait for ALL the web requests to complete

   // Construct the String[] that will be the ultimate result
   ae.Result = new String[uris.Length];

   for (Int32 n = 0; n < uris.Length; n++) {
    // Grab the result of a completed web request
    IAsyncResult result = ae.DequeueAsyncResult();

    // Get the WebRequest object used to initate the request 
    WebRequest webRequest = (WebRequest)result.AsyncState;

    // Build the String showing the result of this completed web request
    ae.Result[n] = "URI=" + webRequest.RequestUri + ", ";

    using (WebResponse webResponse = webRequest.EndGetResponse(result)) {
     ae.Result[n] += "ContentLength=" + webResponse.ContentLength;
    }
   }

   // Modify the collection of most-recent queries
   s_recentRequestsSyncGate.BeginRegion(SyncGateMode.Exclusive, ae.End());
   yield return 1;  // Continue when collection can be updated (modified)

   // If collection is full, remove the oldest item
   if (s_recentRequests.Count == s_recentRequests.Capacity)
    s_recentRequests.RemoveAt(0);

   s_recentRequests.Add(ae.Result);
   s_recentRequestsSyncGate.EndRegion(ae.DequeueAsyncResult());  
  // Updating is done //
  }

  // ASP.NET calls this method when the iterator completes. 
  [WebMethod]
  public String[] EndGetWebSiteDataLength(IAsyncResult result) {
   return m_webSiteDataLength.EndExecute(result);
  }

  private AsyncEnumerator<String[][]> m_aeRecentRequests;

  [WebMethod]
  public IAsyncResult BeginGetRecentRequests(AsyncCallback callback, 
                       Object state) {
   m_aeRecentRequests = new AsyncEnumerator<String[][]>();
   return m_aeRecentRequests.BeginExecute(GetRecentRequests(m
    _aeRecentRequests), callback, state);
  }

  private IEnumerator<Int32> GetRecentRequests(
   AsyncEnumerator<String[][]> ae) {
   // In a shared way, read the collection of most-recent requests
   s_recentRequestsSyncGate.BeginRegion(SyncGateMode.Shared, ae.End());
   yield return 1;  // Continue when collection can be examined (read)

   // Return a copy of the collection as an array
   ae.Result = s_recentRequests.ToArray();
   s_recentRequestsSyncGate.EndRegion(ae.DequeueAsyncResult());  
// Reading is done
  }

  [WebMethod]
  public String[][] EndGetRecentRequests(IAsyncResult result) {
   return m_aeRecentRequests.EndExecute(result);
  }
}


Управление потоком обратного вызова с помощью контекстов синхронизации

По мере завершения асинхронных операции, различные потоки пула потоков пробуждаются для уведомления объекта AsyncEnumerator. Если AsyncEnumerator использует эти потоки для обратного вызова к итератору, то код итератора может быть исполнен различными потоками, даже если код итератора может исполнять внутри себя не более одного потока. В некоторых ситуациях исполнение кода итератора различными потоками может доставить проблемы. Например, в приложении Windows Forms или WPF элемент управления должен управляться потоком, создавшим его, и это не может быть поток пула потоков.

Исполнение кода итератора через произвольные потоки пула потоков может создать другую проблему. В случае, например, приложения ASP.NET, когда впервые приходит запрос клиента, ASP.NET связывает IPrincipal клиента (для олицетворения), а также информацию о культуре со свойствами CurrentPrincipal, CurrentCulture и CurrentUICulture потока пула потоков. Однако, если использовать этот поток для вызова какого-либо метода BeginXxx, то при исполнении нового потока пула потоков для уведомления пользователя о завершении операции в новом потоке пула потоков эти свойства не будут установлены верно по умолчанию.

Чтобы помочь в решении этих проблем, CLR позволяет каждому потоку иметь связанный с ним объект, производный от SynchronizationContext. Этот поток используется, чтобы помочь в поддержании потоковой модели, применяемой моделью приложений. Для Windows Forms и WPF, их производные от SynchronizationContext объекты знают, как передавать вызов функции (сделанный потоком пула потоков) потоку графического интерфейса пользователя. Что касается ASP.NET, то ее производный от SynchronizationContext объект знает, как инициализировать участника и свойства культуры в каждом потоке пула потоков, используемом для обработки одного запроса.

Для обеспечения работы правильной потоковой модели приложения AsyncEnumerator предлагает свойство SyncContext. Это свойство инициализируется значением, возвращаемым статическим свойством Current («Текущий») контекста SynchronizationContext внутри конструктора AsyncEnumerator. Если оно пустое (как обычно должно быть для консоли или приложения службы Windows), то всякий раз, как поток пула потоков вызывает объект AsyncEnumerator, объект просто использует этот поток для вызова к итератору. Однако, если свойство SyncContext непустое, то объект AsyncEnumerator заставляет поток пула потоков вызвать итератор через указанный объект, производный от SynchronizationContext.

Так что для Windows Forms и приложения WPF это означает, что код итератора всегда будет исполняться через поток графического интерфейса пользователя и, следовательно, для обновления элементов управления на форме можно просто исполнить код в итераторе. Нет нужды вызывать методы элемента управления Invoke/BeginInvoke или методы диспетчера Invoke/BeginInvoke. Это делает несложным возложение на итератор вывода сообщений о ходе работы в интерфейсе пользователя при завершении асинхронных операций. Для ASP.NET это означает, что участник и свойства культуры всегда будут установлены верно при исполнении кода итератора. И код на рис. 2 и пример Windows Forms, который я покажу ниже, пользуются этой функцией.


Синхронизация доступа к общим данным

В некоторых ситуациях, особенно относящихся к серверу, может существовать несколько объектов AsyncEnumerator (по одному на клиентский запрос), каждый из которых одновременно обрабатывает собственные итераторы. Для примера, представьте себе веб-узел, который получает доступ к некоей базе данных и затем обновляет набор объектов в памяти. Для доступа к базе данных определенно стоит использовать APM (например, вызывая BeginExecuteReader из класса SqlCommand), после чего необходимо обновить находящиеся в памяти объекты поддерживающим потоки образом. В обычной ситуации здесь можно использовать методы класса Monitor или выражение блокировки C#, или, возможно, класс ReaderWriterLockSlim, поставляемый в комплекте с .NET Framework 3.5. Однако все эти блокировки могут заблокировать вызывающий поток, вредя масштабируемости и скорости ответа. Чтобы избежать блокировки потоков, я обычно использую мой класс ReaderWriterGate, который я описал в своей статье за май 2006 года (см. msdn.microsoft.com/magazine/cc163532).

Когда я начал использовать ReaderWriterGate с AsyncEnumerator, я осознал, что объектная модель ReaderWriterGate может быть улучшена для более качественной интеграции с AsyncEnumerator. Так что я создал новый класс SyncGate, поведение которого очень похоже на ReaderWriterGate. Вот его модель:

  public sealed class SyncGate {
   public SyncGate();
   public void BeginRegion(SyncGateMode mode, 
    AsyncCallback asyncCallback); 
   public void EndRegion(IAsyncResult ar); 
  }
  public enum SyncGateMode { Exclusive, Shared }

При наличии нескольких работающих итераторов, желающих получить доступ к общим данным, сперва сконструируйте SyncGate и сохраните ссылку на него в статическом поле, либо как-нибудь направьте ссылку на него различным итераторам. Затем внутри итераторов, прямо перед кодом, касающимся общих данных, вызовите принадлежащий SyncGate метод BeginRegion, указывающий, необходим ли коду исключительный (запись) или общий (чтение) доступ к данным. Затем пусть итератор применит yield return 1. На этом этапе итератор отпустит свой поток, и когда итератор получит безопасный доступ к данным, AsyncEnumerator автоматически произведет обратный вызов к коду разработчика. Это значит, что потоки не будут блокироваться, когда они ожидают доступа к общим данным.

В итераторе, прямо после кода, касающегося общих данных, вызовите EndRegion. Это укажет SyncGate, что код закончил касаться общих данных и позволяет другим итераторам получить к ним доступ, если им это надо. На рис. 2 внизу итератора GetWebSiteDataLength используется SyncGate для исключительного доступа к статической коллекции. Также на рис. 2 итератор GetRecentRequests показывает, как получить общий доступ к той же самой коллекции.


Группы отказа

Другой функцией, предлагаемой классом AsyncEnumerator, являются группы отказа. Эта функция позволяет итератору выпустить несколько параллельных асинхронных операций и затем решить, что его не интересуют некоторые (или все) из них, заставляя объект AsyncEnumerator автоматически отказываться от результатов при завершении остающихся операций.

Для примера, представьте себе код, который желает получить температуру в городе. Существует много веб-служб, которым можно дать запрос на эту информацию. Можно написать итератор, запрашивающий для получения этой информации три веб-службы и как только любая из них возвратит температуру, можно заранее отказаться от результатов двух других веб-служб. Кое-кто использует этот шаблон для улучшения производительности своих приложений.

Еще один пример: итератор исполняет последовательность операций, а пользователь просто хочет отменить их, потому что ему надоело ждать или он передумал. Существует похожий случай, когда пользователь запускает асинхронную операцию, но желает отказаться ото всех операций, которые не успели завершиться за какой-то определенный промежуток времени. Например, некоторые веб-службы обрабатывают запрос клиента, и если вся обработка не может завершиться, скажем, за две секунды, служба следует уведомить клиента, что его запрос не удался, чем заставлять его ждать ответа до бесконечности.

Группы отказа используются так: внутри итератора пакет связанных операций сводится вместе как часть группы отказа. Группа отказа – это просто значение Int32 в промежутке от 0 до 63 включительно. Например, можно выпустить группу методов BeginXxx, указывая, что все они являются частью группы отказа 0. Тогда итератор сможет обработать некоторые из них по мере их завершения. Когда в коде итератора решено, что больше не следует обрабатывать операции, являющиеся частью этой группы отказа, вызывается принадлежащий AsyncEnumerator метод DiscardGroup:

public void DiscardGroup(Int32 discardGroup);

Этот метод указывает объекту AsyncEnumerator отказываться ото всех остающихся операций, созданных как часть указанной группы отказа, так что код итератора не будет видеть эти операции при вызове DequeueAsyncResult. Увы, этого не вполне достаточно, поскольку APM .NET требует, чтобы методы EndXxx вызывались для каждого метода BeginXxx, иначе возможна утечка ресурсов.

Чтобы удовлетворить это требование, AsyncEnumerator должен вызвать верный метод EndXxx для каждой операции, от которой он отказывается. Поскольку у AsyncEnumerator нет возможности самостоятельно определить верный метод EndXxx, этот метод необходимо указать. При вызове метода BeginXxx вместо простой передачи ae.End для аргумента AsyncCallback необходимо передать один из следующих методов:

AsyncCallback End(Int32 discardGroup, EndObjectXxx callback);
AsyncCallback EndVoid(Int32 discardGroup, EndVoidXxx callback);

EndObjectXxx и EndVoidXxx являются делегатами, определенными следующим образом:

delegate Object EndObjectXxx(IAsyncResult result);
delegate void EndVoidXxx(IAsyncResult result);

Если у метода BeginXxx имеется соответствующий метод EndXxx, возвращающий значение, то будет вызван только что показанный метод End. Если вызвать метод BeginXxx, имеющий соответствующий метод EndXxx, который возвращает пустоту (необычный случай), то будет вызван метод EndVoid. Теперь каждый раз, когда вы указываете AsyncEnumerator отклонить группу, он будет знать, какой метод EndXxx вызвать.

Отметьте, что если метод EndXxx выдаст какое угодно исключение, AsyncEnumerator уловит и проглотит его. Он делает это потому, что отклонение операции указывает – пользователя не волнует успех или неудача операции.

Следует также указать, что при выходе итератора или исполнения им оператора yield break, AsyncEnumerator автоматически отклоняет все группы отказа – выход из итератора указывает, что пользователя не волнуют операции, обработка которых не завершилась. Это может быть очень удобным, поскольку позволяет итератору создать какое-то число асинхронных операций, обработать столько завершенных операций, сколько ему нужно и просто выйти.
AsyncEnumerator автоматически очищает любые остающиеся операции, которые завершаются в будущем. Но обратите внимание, что отказ от операций не отменяет их. Если одна из ожидающих асинхронных операций вела запись в файл или обновляла базу данных, отказ от соответствующих групп не предотвратит завершение этих операций. Он просто позволит коду продолжать работу вне зависимости от того, завершены операции или нет.


Отмена

AsyncEnumerator позволяет внешнему коду прерывать работу итератора. Эта функция особенно полезна для приложений Windows Forms и WPF, поскольку она позволяет нетерпеливому пользователю отменить идущую операцию и возвратить себе контроль над приложением. AsyncEnumerator также способен прекратить собственную работу после указанного промежутка времени. Эта функция полезна для серверных приложений, которые стремятся ограничить объем времени, которого требует ответ на запрос клиента. Методы, относящиеся к отмене, показаны ниже:

public class AsyncEnumerator {
  // Call this method from inside the iterator
  public Boolean IsCanceled(out Object cancelValue);

  // Call this method from inside the iterator
  public Boolean IsCanceled();

  // Call this method from code outside the iterator
  public Boolean Cancel(Object cancelValue};

  // Call this method from code inside or outside the iterator
  public void SetCancelTimeout(Int32 milliseconds,
   Object cancelValue);
}

Чтобы воспользоваться отменой, внутри своего итератора создавайте каждую асинхронную операцию как часть группы отказа. Это позволяет AsyncEnumerator автоматически отказываться от любых операций, которые завершаются после запроса на отмену. Затем, для обнаружения запроса на отмену, включите код, подобный показанному на рис. 3, после каждого оператора yield return.

Рис. 3. Обнаружение отмены

IEnumerator<Int32> MyIterator(AsyncEnumerator ae, ...) {
  // obj refers to some object that has BeginXxx/EndXxx methods
  obj.BeginXxx(...,     // Pass any arguments as usual
   ae.End(0, obj.EndXxx), // For AsyncCallback indicate
               // discard group 0 & proper End method  
               //to call for cleanup
   null);         // BeginXxx's AsyncState argument

  // Make more calls to BeginXxx methods if desired here...

  yield return n; // Resume iterator after 'n' operations
          // complete or if cancelation occurs 

  // Check for cancellation
  Object cancelValue;
  if (ae.IsCanceled(out cancelValue)) {
   // The iterator should cancel due to user request/timeout
   // Note: It is common to call "yield break" here.
  } else {
   // Call DequeueAsyncResult 'n' times to 
   // process the completed operations
  }
}

Теперь, когда требуется начать исполнение отменяемого итератора, создается объект AsyncEnumerator, и на нем вызывается объект BeginExecute, точно так же, как это делается обычно. Затем, когда какой-то части приложения требуется прервать работу итератора, она вызывает метод Cancel («Отмена»). При вызове Cancel можно передать ему ссылку на объект, которая затем передается итератору при помощи выходного параметра метода IsCanceled. Этот объект дает коду, отменяющему работу итератора, способ сообщить итератору, почему она отменяется. Если для итератора не важно, почему он отменяется, он может вызвать перегруженный метод IsCanceled, который не принимает параметров.

Метод SetCancelTimeout может быть вызван кодом как изнутри, так и снаружи итератора. Когда вызван этот метод, он устанавливает таймер, который автоматически вызовет Cancel (передавая значение, указываемое через аргумент CancelValue) по истечении времени.

В коде на рис. 4 показано приложение Windows Forms, использующее многие из функций, обсуждавшихся в статье. В нем используется принадлежащая AsyncEnumerator функция SyncContext, чтобы обеспечить выполнение всего кода итератора через поток графического интерфейса пользователя, что позволяет обновлять элементы управления интерфейса пользователя. Этот код также показывает, как использовать поддержку APM в AsyncEnumerator, не блокируя поток графического интерфейса пользователя и позволяя интерфейсу пользователя продолжать работать.

Рис. 4. WindowsFormsViaAsyncEnumerator.cs

namespace WinFormUsingAsyncEnumerator {
  public partial class WindowsFormsViaAsyncEnumerator : Form {
   public static void Main() {
    Application.Run(new WindowsFormsViaAsyncEnumerator());
   }

   public WindowsFormsViaAsyncEnumerator() {
    InitializeComponent();
   }

   private AsyncEnumerator m_ae = null;

   private void m_btnStart_Click(object sender, EventArgs e) {
    String[] uris = new String[] {
     "http://Wintellect.com/", 
     "http://1.1.1.1/",  // Demonstrates error recovery
     "http://www.Devscovery.com/" 
    };

    m_ae = new AsyncEnumerator();

    // NOTE: The AsyncEnumerator automatically saves the 
    // Windows Forms SynchronizationContext with it ensuring
    // that the iterator always runs on the GUI thread; 
    // this allows the iterator to access the UI Controls

    // Start iterator asynchonously so that GUI thread doesn't block
    m_ae.BeginExecute(GetWebData(m_ae, uris), m_ae.EndExecute);
   }

   private IEnumerator<Int32> GetWebData(AsyncEnumerator ae, String[] uris) {
    ToggleStartAndCancelButtonState(false);
    m_lbResults.Items.Clear();

    if (m_chkAutoCancel.Checked)
     ae.SetCancelTimeout(5000, ae);

    // Issue several Web requests (all in discard group 0) simultaneously
    foreach (String uri in uris) {
     WebRequest webRequest = WebRequest.Create(uri);

     // If the AsyncEnumerator is canceled, DiscardWebRequest cleans up
     // any outstanding operations as they complete in the future
     webRequest.BeginGetResponse(ae.EndVoid(0, DiscardWebRequest), 
                              webRequest);
    }

    yield return uris.Length; // Process the completed Web requests 
                 // after all complete

    String resultStatus; // Ultimate result of processing shown to user

    // Check if iterator was canceled
    Object cancelValue;
    if (ae.IsCanceled(out cancelValue)) {
     ae.DiscardGroup(0);
     // Note: In this example calling DiscardGroup above is not mandatory
     // because the whole iterator is stopping execution; causing all
     // discard groups to be discarded automatically.

     resultStatus = (cancelValue == ae) ? "Timeout" : "User canceled";
     goto Complete;
    }

    // Iterator wasn't canceled, process all the completed operations
    for (Int32 n = 0; n < uris.Length; n++) {
     IAsyncResult result = ae.DequeueAsyncResult();

     WebRequest webRequest = (WebRequest)result.AsyncState;

     String s = "URI=" + webRequest.RequestUri + ", ";
     try {
      using (WebResponse webResponse = webRequest. 
         EndGetResponse(result)) {
           s += "ContentLength=" + webResponse.ContentLength;
      }
     }
     catch (WebException e) {
      s += "Error=" + e.Message;
     }
     m_lbResults.Items.Add(s); // Add result of operation to listbox
    }
    resultStatus = "All operations completed.";

   Complete:
    // All operations have completed or cancellation occurred, tell   // user
    MessageBox.Show(this, resultStatus);

    // Reset everything so that the user can start over if they desire
    m_ae = null;  // Reset since we're finished
    ToggleStartAndCancelButtonState(true);
   }

   private void m_btnCancel_Click(object sender, EventArgs e) {
    m_ae.Cancel(null);
    m_ae = null;
   }

   // Swap the Start/Cancel button states
   private void ToggleStartAndCancelButtonState(Boolean enableStart) {
    m_btnStart.Enabled = enableStart;
    m_btnCancel.Enabled = !enableStart;
   }

   private void DiscardWebRequest(IAsyncResult result) {
    // Get the WebRequest object used to initate the request 
    // (see BeginGetResponse's last argument)
    WebRequest webRequest = (WebRequest)result.AsyncState;

    // Clean up the async operation and Close the WebResponse (if no    // exception)
    webRequest.EndGetResponse(result).Close();
   }
  }
}

Внутри итератора многие веб-запросы создаются как часть группы отмены и, поскольку интерфейс пользователя продолжает работать, пользователь может нажать кнопку Cancel, если ему надоест ждать результатов. Если это произойдет, AsyncEnumerator автоматически завершит любые операции, так что коду итератора не придется брать на себя никакой очистки. Обратите внимание, что форма также показывает, как установить таймер, так что AsyncEnumerator отменит собственную работу автоматически через пять секунд, если не завершена ни одна из операций.

Этот образец выполняет веб-запросы, используя класс WebRequest из .NET. При вызове метода BeginGetResponse класса WebRequest очистка требует не просто вызова EndGetResponse. Необходимо также вызвать метод Close («Закрыть») или Dispose («Удалить») на объекте WebResponse, возвращаемом EndGetResponse.

По этой причине код передает метод DiscardWebRequest методу EndVoid при вызове BeginGetResponse. Метод DiscardWebRequest гарантирует, что объект WebResponse закрыт, если выполнение веб-запроса было успешным и не привело к исключению.

Многим разработчикам известно, что асинхронное программирование является ключом к повышению производительности, масштабируемости, скорости ответа и надежности их приложений, серверов и компонентов. Увы, многие разработчики отказываются принять асинхронное программирование в полной мере, поскольку его модель программирования была куда более нудной и сложной, чем опробованная на практике модель синхронного программирования.

Использование функции итератора C# и моего класса AsyncEnumerator позволяет разработчикам применить асинхронное программирование изнутри синхронной модели программирования. AsyncEnumerator также легко интегрируется с другими частями .NET Framework и предлагает множество функций, позволяющих разработчикам в своих приложениях выходить за пределы того, что возможно с помощью обычной синхронной модели программирования.
Я использую AsyncEnumerator уже более года и помог многим компаниям интегрировать его в их программное обеспечение с превосходными результатами. Загрузите код с wintellect.com/PowerThreading.aspx. Я надеюсь, что вы извлечете из него не меньше пользы, чем я.

Скачать исходники примеров кода 

Автор: Джеффри Рихтер (Jeffrey Richter)