ようこそ。睡眠不足なプログラマのチラ裏です。

Retry Monad for Transient Fault Handling (Topaz + FSharpx)


4月14日に札幌で行われた第69回CLR/H勉強会にて、「Retry Monad for Transient Fault Handling - F#とWindows Azure と私 -」と題して、ライトニングトークで発表しました。


Microsoft Enterprise Library 5.0 Integration Pack for Windows Azure(EL5 for Azure)のTopaz および FSharpx を利用してモナドを作りました。Topazを利用する理由は、再利用可能な再試行戦略およびWindows Azure向けの検出戦略が組み込み済みであり、それをそのまま利用したいからです。EL5 for AzureはOSSなので、どのような実装がなされているか実際に確認することができるので、すべてをF#で書き直すこともできますが、それでは車輪の再発明になってしまいます。Retry Monad for Transient Fault Handling は、一時的障害が発生するかもしれない計算について、それぞれ異なるRetryPolicyを適用しながら再試行処理を行います。一時的な障害に対するリトライ処理をひとつの計算として包括的に扱うことができるモナド実装です。このRetryモナドの計算結果は、Choice<’T1,’T2>型で得ることができ、これによりFSharpxで定義済みの Eitherモナドで扱うこともできます。



Retry Monad for Transient Fault Handling

namespace Monad.Retry 
open System

[<AutoOpen>]
module Retry =
  // #r "Microsoft.Practices.TransientFaultHandling.Core"
  // #r "FSharpx.Core"
  open Microsoft.Practices.TransientFaultHandling
  open FSharpx
 
  [<Sealed>]
  type TransientErrorCatchAllStrategy () =
    interface ITransientErrorDetectionStrategy with
      member this.IsTransient (ex : exn)  = true

  [<Sealed>]
  type TransientErrorIgnoreStrategy () =
    interface ITransientErrorDetectionStrategy with
      member this.IsTransient (ex : exn)  = false

  let defaultRetryStrategyName = "DefaultRetry"
  let defaultRetryCount = 3
  let defaultMinBackoff = TimeSpan.FromSeconds(3.0)
  let defaultMaxBackoff = TimeSpan.FromSeconds(90.0)
  let defaultDeltaBackoff = TimeSpan.FromMilliseconds(30.0)

  let (<+) (rp:RetryPolicy<'TResultStrategy>) retrying = rp.Retrying |> Event.add(retrying)

  type RetryPolicies =
    static member NoRetry() = new RetryPolicy<TransientErrorIgnoreStrategy>(0, TimeSpan.Zero)
    static member Retry<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryCount : int , retryInterval : TimeSpan) : RetryPolicy<'TTransientErrorCatchStrategy> =
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryCount, retryInterval)
    static member Retry<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryCount : int , initialInterval : TimeSpan, increment : TimeSpan) : RetryPolicy<'TTransientErrorCatchStrategy> =
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryCount, initialInterval, increment)
    static member Retry<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryStrategy : RetryStrategy) : RetryPolicy<'TTransientErrorCatchStrategy> =
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryStrategy)
    static member RetryExponential<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryCount : int , deltaBackoff : TimeSpan) : RetryPolicy<'TTransientErrorCatchStrategy> =
      let retryStrategy = new ExponentialBackoff(defaultRetryStrategyName, retryCount, defaultMinBackoff, defaultMaxBackoff , deltaBackoff)
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryStrategy)
    static member RetryExponential<'TTransientErrorCatchStrategy when 'TTransientErrorCatchStrategy : (new :unit -> 'TTransientErrorCatchStrategy) and 'TTransientErrorCatchStrategy :> ITransientErrorDetectionStrategy>(retryCount : int , minBackoff : TimeSpan, maxBackoff : TimeSpan, deltaBackoff : TimeSpan) : RetryPolicy<'TTransientErrorCatchStrategy> =
      let retryStrategy = new ExponentialBackoff(defaultRetryStrategyName, retryCount, minBackoff, maxBackoff, deltaBackoff)
      new RetryPolicy<'TTransientErrorCatchStrategy>(retryStrategy)
    static member RetryDefault(?retryCount : int) : RetryPolicy<TransientErrorCatchAllStrategy>=
      let retryCount = defaultArg retryCount defaultRetryCount
      RetryPolicies.RetryExponential<TransientErrorCatchAllStrategy>(retryCount, defaultMinBackoff, defaultMaxBackoff, defaultDeltaBackoff)

  type Retry<'TResult> = Retry of (Lazy<unit -> 'TResult * LastException option>)
  and RetryResult<'TResult> = Choice<'TResult, LastException>
  and LastException = exn

  let exnHandler e = Retry(lazy(fun () -> Unchecked.defaultof<'TResult>, e |> Some))    
  type RetryBuilder (policy : RetryPolicy) = 
    new(?retryCount : int, ?retrying) = 
      let policy = 
        let retryCount = defaultArg retryCount defaultRetryCount
        RetryPolicies.RetryDefault(retryCount)

      retrying |> function
      | None   -> policy <+ (fun e -> printfn "%s" (sprintf "RetryPolicyName:%s, CurrentRetryCount:%d, LastException.Message:%s, Delay:%A" 
                                                            policy.RetryStrategy.Name e.CurrentRetryCount e.LastException.Message e.Delay))
      | Some retrying ->policy <+ retrying
      RetryBuilder(policy)
    
    member this.Bind (m : Retry<'TResult>, bind : ('TResult) -> Retry<'UResult>) : Retry<'UResult> = 
      Retry(lazy(fun () -> 
        m |> function
        | Retry f -> f.Force() |> fun cont -> 
          cont() ||> fun r _ -> r |> bind
        |> function
          | Retry f -> f.Force() 
          |> fun cont -> policy.ExecuteAction(Func<_>(fun () -> cont() ||> fun r _ -> r,None))))
    member this.Return (value : 'TResult) : Retry<'TResult> = 
      Retry(lazy (fun () -> policy.ExecuteAction(L.F<_>(fun () ->  value, None))))
    member this.ReturnFrom (m : Retry<'TResult>) : Retry<'TResult> = 
      m
    member this.Delay (f: unit -> Retry<unit -> 'TResult>)  : Retry<unit -> 'TResult> = 
      Retry(lazy (fun () -> policy.ExecuteAction(L.F<_>(fun () -> f() |> function | Retry f -> f.Force() |> fun cont -> cont() ||> fun f _ -> f(), None)) ||> fun r _ ->  (fun () -> r), None))
    member this.Zero () : Retry<'TResult> = 
      this.Return(Unchecked.defaultof<'TResult>)
    member this.Combine(comp1:Retry<'TResult>, comp2:Retry<'TResult>) = 
      this.Bind(comp1,(fun r -> comp2))

  let retry = new RetryBuilder()

  open Operators
  let inline returnM x = returnM retry x 
  let inline (>>=) m f = bindM retry m f
  let inline (=<<) f m = bindM retry m f
  let inline (<*>) f m = applyM retry retry f m
  let inline ap m f = f <*> m
  let inline map f m = liftM retry f m
  let inline (<!>) f m = map f m
  let inline lift2 f a b = returnM f <*> a <*> b
  let inline ( *>) x y = lift2 (fun _ z -> z) x y
  let inline ( <*) x y = lift2 (fun z _ -> z) x y
  let inline (>>.) m f = bindM retry m (fun _ -> f)
  let inline (>=>) f g = fun x -> f x >>= g
  let inline (<=<) x = flip (>=>) x

  let (|RetryResult|) = 
    let rec result (r:RetryResult<'TResult>) =
      match r with
      | Choice1Of2 v -> v, None
      | Choice2Of2 e -> Unchecked.defaultof<'TResult>, Some(e)
    result

  let run (retryCont : Retry<unit -> 'TResult>) : RetryResult<'TResult> =
    try
      retryCont |> function
      |(Retry f) -> f.Force()() ||> fun r e -> 
        e |> function
        |Some e -> e |> Choice2Of2
        |None   -> r() |> Choice1Of2
    with e -> e |> Choice2Of2



一時的な障害:Windows Azure(クラウド)アプリケーションを開発するにあたって対処しなければならない課題のひとつ

他のクラウドサービスに依存するようなクラウドアプリケーションを開発する場合に開発者が対処しなければならない課題の一つに、“一時的な障害” があります。インフラストラクチャレベルの障害であったり、ネットワークの問題など一時的な条件のために発生する恐れのある障害のことです。この一時的に発生しうる障害は、ほとんどの場合は短い間隔で(ほんの数ミリ秒後に)リトライ処理を行うことで回避することができます。


たとえば、Windows AzureSQL Azureプラットフォームを利用する場合。SQL Azureサービスは、共有リソース上で大規模なマルチテナントデータベースとしてサービスが提供されるので、データベースを利用するすべての利用者に対して良好なエクスペリエンスを提供しなければなりません。そのため、SQL Azureは過剰なリソースの使用や、実行時間の長いトランザクションの発行された場合など、さまざまな理由でサービスへの接続数を抑制して、利用者が意図しないタイミングで接続を切断することがあります。これが、SQL Azureを利用した場合に生じる一時的な障害ということになります。このような障害が発生した場合であってもシームレスなユーザーエクスペリエンスを提供するために、Windows Azureアプリケーション(クラウドアプリケーション)では、一時的な障害によって処理が中断された場合にはリトライを試みるようにアプリケーションを実装する必要があります。


Microsoft Enterprise Library 5.0 Integration Pack for Windows Azureを利用する

一時的な障害に対応するアプリケーションを実装する場合、Microsoft Enterprise Library 5.0 Integration Pack for Windows Azure(以降 EL5 for Azure)を利用するのが有効です。EL5 for Azureは、マイクロソフトの pattern & practice チームによる、マイクロソフト製品やテクノロジを基として、アプリケーションを構築する上でのパターンやベストプラクティスを集めたライブラリの Windows Azure向けの拡張パックです。この拡張ライブラリが提供されるまでは、一時的障害を検知してリトライ処理を行う実装を開発者自身がおのおので組み込まなければなりませんでした。EL5 for Azureには、Transient Fault Handling Application Block (Topaz)という、Windows Azureのプラットフォームに含まれるサービス利用時に発生するさまざまな一時的な障害からWindows Azureアプリケーションを回復させるためのアプリケーションブロックが提供されています。これは、Windows Azure固有の一時的な障害のみならず、オンプレミスアプリケーションで発生するさまざまな一時的な障害に対するリトライ処理についても利用可能なように設計されており、リトライ処理について高いレベルで抽象化されたアプリケーションブロックです(Microsoft.Practices.TransientFaultHandling.Core.dllにまとめらえている)。特に、Windows Azureに特化した組み込みの実装については、SQL AzureWindows Azure ストレージサービス、Windows Azure サービスバス、Windows Azure キャッシングサービス向けの検出戦略がそれぞれ提供されていて、Microsoft.Practices.EnterpriseLibrary.WindowsAzure.TransientFaultHandling.dllに含まれています。



検出戦略と再試行戦略

検出戦略は、ITransientErrorDetectionStrategyインターフェイスを実装して作成することができます。

public interface ITransientErrorDetectionStrategy
{
    bool IsTransient(Exception ex);
}

例外を引数で受け取り、その例外の種類や内部的なメッセージなどを判断して、リトライ処理を行うときは true、 リトライをせずに無視するときは falseを返すように実装するだけの非常にシンプルなインターフェイスです。Windows Azureの一時的な障害に対する4つの組み込み検出戦略として、SqlAzureTransientErrorDetectionStrategy、StorageTransientErrorDetectionStrategy、ServiceBusTransientErrorDetectionStrategy、CacheTransientErrorDetectionStrategyが提供されています。




再試行戦略は、RetryStrategy抽象クラスを継承して作成することができます。

    public abstract class RetryStrategy
    {
        public static readonly int DefaultClientRetryCount = 10;
        public static readonly TimeSpan DefaultClientBackoff = TimeSpan.FromSeconds(10.0);
        public static readonly TimeSpan DefaultMaxBackoff = TimeSpan.FromSeconds(30.0);
        public static readonly TimeSpan DefaultMinBackoff = TimeSpan.FromSeconds(1.0);
        public static readonly TimeSpan DefaultRetryInterval = TimeSpan.FromSeconds(1.0);
        public static readonly TimeSpan DefaultRetryIncrement = TimeSpan.FromSeconds(1.0);
        public static readonly bool DefaultFirstFastRetry = true;

        public static readonly RetryStrategy NoRetry = new FixedInterval(0, DefaultRetryInterval);
        public static readonly RetryStrategy DefaultFixed = new FixedInterval(DefaultClientRetryCount, DefaultRetryInterval);
        public static readonly RetryStrategy DefaultProgressive = new Incremental(DefaultClientRetryCount, DefaultRetryInterval, DefaultRetryIncrement);
        public static readonly RetryStrategy DefaultExponential = new ExponentialBackoff(DefaultClientRetryCount, DefaultMinBackoff, DefaultMaxBackoff, DefaultClientBackoff);

        protected RetryStrategy(string name, bool firstFastRetry)
        {
            this.Name = name;
            this.FastFirstRetry = firstFastRetry;
        }

        public bool FastFirstRetry { get; set; }
        public string Name { get; private set; }
        public abstract ShouldRetry GetShouldRetry();
    }


基本的な実装は、GetShouldRetryメソッドをオーバーライドし、リトライすべきタイミングか否かを表すShouldRetry デリゲートを返すように実装します。

public delegate bool ShouldRetry(int retryCount, Exception lastException, out TimeSpan delay);


ShouldRetry デリゲートは、リトライする回数と最後に発生した例外およびリトライを行うタイミングの遅延間隔を受け取り、リトライ処理を行うべきタイミングか否かを返します。組み込みで、Incremental(再試行と再試行間の増分の時間間隔数を制御する戦略)、FixedInterval(再試行と一定間隔の再試行間を制御する戦略)、ExponentialBackoff(指数関数的な遅延を計算するためのバックオフ戦略)が提供されています。



Transient Fault Handling Application Block (Topaz)によるリトライ処理の基本的な利用方法


Transient Fault Handling Application Block (Topaz)による基本的な利用方法(C#)は、検出戦略と再試行戦略を組み合わせて、RetryPolicyオブジェクトを作成し、そのRetryPolicyオブジェクトにリトライ中の処理を適宜設定し、RetryPolicyオブジェクトのExecuteActionメソッドを呼び出します。ExecuteActionメソッドへは、リトライを行いたい対象の処理を継続渡しスタイルで渡します。

var strategy = new Incremental("Incr1",10, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
var policy = new RetryPolicy<SqlAzureTransientErrorDetectionStrategy>(strategy);

policy.Retrying += (_, e) =>
{
	Console.WriteLine("{0:HH:mm:ss.fff} RetryCount: {1}, ErrorMessage: {2}, StackTrace: {3}",
	    DateTime.Now,
	    e.CurrentRetryCount,
	    e.LastException.Message,
	    e.LastException.StackTrace);
};

var result = policy.ExecuteAction(() =>
{
	// SQL Azureへごにょごにょ

	return "クエリの結果などを返す";
});

EL5 for Azureはオブジェクト指向プログラミングで書かれているライブラリ、FSharpxは関数プログラミングで書かれているライブラリです。これら異なるパラダイムの部品を組み合わせてモナドを作る。とっても面白いですね。



モナドとは

モナドは単なる自己関手の圏におけるモノイド対象だよ。何か問題でも? - フィリップ・ワドラー


圏論を少しかじったことがある人にとっては問題ない説明なのですが、そうではない場合「日本語でおk」と言わざるを得ません。
この説明だけでは少々乱暴すぎるので、MSDN - コンピューテーション式(F#)へのリンクと、F#とモナドの関係について参考になりそうな表を置いておきます。


コンピュテーション式 (F#)
http://msdn.microsoft.com/ja-jp/library/dd233182(v=vs.110).aspx


Haskell F# 数学(圏論)
return return η(単位元:unit)
>>= bind (*)operator
型クラスMonadインスタンスであるように実装する コンピューテーション式で少なくとも Return と Bind の2つのmemberを実装する NA
Monad Computation Expression, Workflow モナドはKleisliトリプルと等価な定義。F# と Haskell の中で定義されるモナドの構造は実際にKleisliトリプル。
functor through a type class definition usually not mentioned 関手(functor)
function function (fun) 射(morphism)
Haskellのデータ型のHask圏 .Netデータ型の圏 グループ、位相、グラフ、微分幾何学
composable functions composable functions 2項演算とモノイド

MSDN - Code Recipe - F#によるモナドの実装方法とモナド則を確認するユニットテスト。 Retry Monad for Transient Fault Handling

モナド則を確認するためのユニットテスト等を含む、このプログラムコードのソリューションファイル一式を、MSDN - Code Recipe よりダウンロードすることができます。

http://code.msdn.microsoft.com/F-Retry-Monad-for-35ee1e72


関連記事
快刀乱麻を断つモナド - F#とIOモナドとコンピューテーション式の奥義と
http://d.hatena.ne.jp/zecl/20110703/p1