お知らせ フロントエンド バックエンド インフラ 品質保証 セキュリティ 製品 興味・関心 その他

2025.03.10

インフラ

AWS Step Functionsで再帰呼び出し

AWS Step Functionsは小規模なマイクロサービスを「視覚的なワークフロー」でオーケストレーションし、自動化することができるサービスです。条件分岐、並列処理などの「フロー」が用意されていて、簡単なロジックをステートマシンで組むことができます。

また、Step Functionsから別のStep Functionsを呼び出し、複雑な処理を複数のStep Functionsに分割するといったテクニックもあるようです。ここで呼び出し先を別のStep Functionsではなく自分自身を呼び出す再帰呼び出しをすることが可能なのか、ということを試してみました。再帰呼び出しができるとアルゴリズムの幅が広がり、適用可能な課題の範囲が広がると考えられます。

例題

四則演算を行うシステムを作成します。

計算式は下のような木構造で与えられます。システムへの入力は右側のようなJSON形式で与えます。

{
  "operator": "-",
  "operand1": {
    "operator": "+",
    "operand1": 3,
    "operand2": {
      "operator": "*",
      "operand1": 3,
      "operand2": 2
    }
  },
  "operand2": {
    "operator": "+",
    "operand1": 1,
    "operand2": 1
  }
}

これは数式の

(3 + 3 x 2) - (1 + 1) = ?

を表します。ちなみに、逆ポーランド記法で書くと、木構造に対応した形に近くなり、

3 3 2 x + 1 1 + -

となります。

システムはJSON形式の数式を入力として受け取り、数式の計算結果を出力します。

システム構成

メインの処理はStep Functionsを使用します。Step Functionsでは四則演算ができないため、その部分のみLambda (Python)で記述します。

使用するAWSサービスは以上ですが、これらに付随してIAMロールが必要となります。

Lambda

以下の3つの情報

  • 演算子: operator
  • 数値1: operand1
  • 数値2: operand2

を入力すると、計算結果を返すだけのシンプルなプログラムです。数値1,2は「数値」のみを受け付けます。Lambdaのプログラムは下記が全てです。

def lambda_handler(event, context):
    if event["operator"] == "+":
        result = event["operand1"] + event["operand2"]
    elif event["operator"] == "-":
        result = event["operand1"] - event["operand2"]
    elif event["operator"] == "*":
        result = event["operand1"] * event["operand2"]
    elif event["operator"] == "/":
        result = event["operand1"] / event["operand2"]
    else:
        result = event["operand1"]
    return result

ステートマシン

Step Functionsのステートマシンを下のように組みました。再帰呼び出しをしますので、ステートマシンは木構造の1段分の処理のみを記述すればOKとなります。

ステートマシンの詳細は最後にASL(Amazon States Language)による記述を添付します。ここからはステートマシンの各要素について簡単に説明します。

ステートマシンを開始すると、という条件分岐を行います。ここでは下記の4パターンを判定します。

  1. Operand1、Operand2ともに、さらに計算が必要
  2. Operand1は計算が必要、Operator2は数値
  3. Operand1は数値、Operand2は計算が必要
  4. Operand1、Operand2ともに数値

例えば入力が

{
  "operator": "-",
  "operand1": 5,
  "operand2": 1
}

の場合は「4」に該当します。また、

{
  "operator": "-",
  "operand1": {
    "operator": "+",
    "operand1": 3,
    "operand2": 2
  },
  "operand2": 1
}

の場合は「2」に該当します。

1〜3の場合は、数値ではないOperandがありますのでそのまま計算することはできません。Operand部分を切り出して、自分自身を再帰呼び出し()してOperandの値を数値に変換します。上の例ではoperand1の計算が必要なので、

{
  "operator": "+",
  "operand1": 3,
  "operand2": 2
}

を入力として自分自身を再帰呼び出しして計算結果を得ます。

1〜3の場合は再帰呼び出しを実施後、Operand1、Operand2ともに数値に変換済みとなります。4の場合は最初から入力は数値です。この状態で四則演算Lambdaを呼び出すと、計算結果が得られます。

実行例1: 単純な例

まずは数値のみの入力の単純な例 "5 - 1 = ?" をシステムに適用します。

{
  "operator": "-",
  "operand1": 5,
  "operand2": 1
}

実行ログは以下のとおりです。

ステートマシン図の一番右のパスに該当しますので、再帰呼び出しをせず、Lambdaを呼び出し(Step名「Operand1とOperand2をそのまま演算」)て終了となります。最終ステップの”output“に計算結果の”4“が出力されています。実行時間は0.531秒となかなか高速です。

実行例2: Operandの片方に計算が必要

再帰呼び出しが必要な例 "1 - (3 + 2) = ?" を行います。

{
  "operator": "-",
  "operand1": 1,
  "operand2": {
    "operator": "+",
    "operand1": 3,
    "operand2": 2
  }
}

Operand2が数値ではなく式なので、再帰呼び出しが発生します。上のステートマシンの図では右から2番目のパスを通ります。

Step名:「Operand2を再帰で計算」でOperand2の部分を切り出して自分自身を再帰呼び出しして式を数値に変換しています。得られた数値と、もともと数値だったOperand1をLambdaに入力して計算し、最終結果”-4″が出力されました。

実行時間は子のStep Functionsの呼び出しに2秒程度かかっています。

参考までに、再帰呼び出し先の「子のStep Functions」のログは下のようになります。15:37:15.651には「子」の実行は終了していますが、親が結果を受け取っているのは15:37:17.388なので、親が子の終了を検出するのに2秒程度時間がかかっているようです。

実行例3: 複雑な例

さらに複雑な例として、木の高さが3レベルとなる例 "(3 + 3 x 2) - (1 + 1) = ?"を実行します。再帰呼び出し先のStep Functions(子)からさらに再帰呼び出し(孫)が発生します。

{
  "operator": "-",
  "operand1": {
    "operator": "+",
    "operand1": 3,
    "operand2": {
      "operator": "*",
      "operand1": 3,
      "operand2": 2
    }
  },
  "operand2": {
    "operator": "+",
    "operand1": 1,
    "operand2": 1
  }
}

実行結果は以下のとおりです。

この例ではOperand1、Operand2ともに式ですので二つの「子のStep Functions」を再帰呼び出ししています。上の図のID = 7とID = 9が該当しますが、これらはParallelの部品の中に配置していますので並列実行されています。両方の結果を待って、Lambdaの計算を実施しています。

Operand1の中のさらにOperand2が式なので、孫のStep Functionsがさらに再帰で呼び出されますが、トップのStep Functionsのログからは孫の呼び出しは見えません。孫の呼び出しを行なっているため、Operand1の方はさらに実行時間がかかっています。

全体の実行時間は5.502秒と、木の高さが高くなった分時間がかかっています。

実行性能の考察

Parallelの効果で木の同一レベルの処理は並列実行されるため、横方向の広がりに関しては、いくら複雑になったとしても実行時間に差は出ないと考えられます。

それに対し、木の高さが増えれば増えるほど実行時間に影響が出ます。

応用

今回のサンプルは四則演算をするという単純な例ですが、例えば

  • 数値の部分を固定ではなく、入力条件によってどこかのデータソースからデータをとってきたり、データベースから検索した結果とする
  • 演算子部分を複雑な処理にする
  • 演算子の入力を二つではなく、任意の入力数を取れるようにする

などの変更をすれば、複雑なアルゴリズムをシンプルなステートマシンで記述するといった活用方法が考えられます。Parallelによる並列実行とスケーラブルなAWSのインフラ資源により、「計算時間が木の横方向の複雑度には影響せず、木の高さのみに依存する」ため、システムの高速化をはかる可能性も開けます。

再帰呼び出しの危険性

最後に再帰呼び出しの危険性について書きます。

再帰呼び出しは自分自身を呼び出すということから、再帰を止める条件にバグがあると無限に再帰呼び出しが発生して、無限に増殖してしまうという恐ろしい事故が発生する可能性を持っています。

通常のプログラムで再帰呼び出しにバグがあった場合は、単にプロセスが暴走するというだけですのでプロセスをkillすれば対処は済みますが、Step Functionsの場合は無限にStep Functionsの実行が増殖しますので、どう止めて良いのか分かりません。おそらく何らかのQuotaに引っかかってどこかで停止するとは思いますが、実際、何のQuotaに最初に引っかかるのか試していないので分かりません。試したくもありません。

再帰を行う条件部分については十分注意して実装しましょう。

ステートマシンのASL

今回作成したシステムのステートマシンのASLです。

AWS SAMで作成していますので、ASL中の

  • ${StateMachineArn}
  • ${FunctionArn}

はSAMによって実際のARNに自動的に置換されます。

{
  "StartAt": "ネストチェック",
  "States": {
    "ネストチェック": {
      "Type": "Choice",
      "Choices": [
        {
          "Next": "両方ネスト",
          "Condition": "{% $states.input.operand1.operator != null and $states.input.operand2.operator != null %}"
        },
        {
          "Next": "Operand1を再帰で計算",
          "Condition": "{% $states.input.operand1.operator != null %}"
        },
        {
          "Next": "Operand2を再帰で計算",
          "Condition": "{% $states.input.operand2.operator != null %}"
        }
      ],
      "Default": "Operand1とOperand2をそのまま演算"
    },
    "両方ネスト": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Operand1を再帰で計算(両方ネスト)",
          "States": {
            "Operand1を再帰で計算(両方ネスト)": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Arguments": {
                "StateMachineArn": "${StateMachineArn}",
                "Input": "{% $states.input.operand1 %}"
              },
              "End": true,
              "Output": {
                "operator": "{% $states.input.operator %}",
                "operand1": "{% $states.result.Output %}",
                "operand2": "{% $states.input.operand2 %}"
              }
            }
          }
        },
        {
          "StartAt": "Operand2を再帰で計算(両方ネスト)",
          "States": {
            "Operand2を再帰で計算(両方ネスト)": {
              "Type": "Task",
              "Resource": "arn:aws:states:::states:startExecution.sync:2",
              "Arguments": {
                "StateMachineArn": "${StateMachineArn}",
                "Input": "{% $states.input.operand2 %}"
              },
              "End": true,
              "Output": {
                "operator": "{% $states.input.operator %}",
                "operand1": "{% $states.input.operand1 %}",
                "operand2": "{% $states.result.Output %}"
              }
            }
          }
        }
      ],
      "Next": "再帰計算されたOperand1とOperand2で演算"
    },
    "再帰計算されたOperand1とOperand2で演算": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Output": "{% $states.result.Payload %}",
      "Arguments": {
        "FunctionName": "${FunctionArn}",
        "Payload": {
          "operator": "{% $states.input[0].operator %}",
          "operand1": "{% $states.input[0].operand1 %}",
          "operand2": "{% $states.input[1].operand2 %}"
        }
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2,
          "JitterStrategy": "FULL"
        }
      ],
      "End": true
    },
    "Operand1を再帰で計算": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Arguments": {
        "StateMachineArn": "${StateMachineArn}",
        "Input": "{% $states.input.operand1 %}"
      },
      "Output": {
        "operator": "{% $states.input.operator %}",
        "operand1": "{% $states.result.Output %}",
        "operand2": "{% $states.input.operand2 %}"
      },
      "Next": "再帰計算されたOperand1で演算"
    },
    "再帰計算されたOperand1で演算": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Output": "{% $states.result.Payload %}",
      "Arguments": {
        "FunctionName": "${FunctionArn}",
        "Payload": "{% $states.input %}"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2,
          "JitterStrategy": "FULL"
        }
      ],
      "End": true
    },
    "Operand2を再帰で計算": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Arguments": {
        "StateMachineArn": "${StateMachineArn}",
        "Input": "{% $states.input.operand2 %}"
      },
      "Output": {
        "operator": "{% $states.input.operator %}",
        "operand1": "{% $states.input.operand1 %}",
        "operand2": "{% $states.result.Output %}"
      },
      "Next": "再帰計算されたOperand2で演算"
    },
    "再帰計算されたOperand2で演算": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Output": "{% $states.result.Payload %}",
      "Arguments": {
        "FunctionName": "${FunctionArn}",
        "Payload": "{% $states.input %}"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2,
          "JitterStrategy": "FULL"
        }
      ],
      "End": true
    },
    "Operand1とOperand2をそのまま演算": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Output": "{% $states.result.Payload %}",
      "Arguments": {
        "FunctionName": "${FunctionArn}",
        "Payload": "{% $states.input %}"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2,
          "JitterStrategy": "FULL"
        }
      ],
    }
  },
  "QueryLanguage": "JSONata",
  "Comment": "Step Functions recursive call sample"
}

SAMテンプレート

システム全体を作成するSAMテンプレートも以下に添付します。

AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
  Step Functionsの再帰呼び出しサンプル

Parameters:
  # 自分自身で循環参照(Circular Dependency)になるため、
  # Step Functionsの名称はCloudFormationで自動生成できない
  StateMachineName:
    Description: Step Functions Name
    Type: String
    Default: RecursiveSample

Resources:
  StateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      Name: !Ref StateMachineName
      DefinitionUri: statemachine/recursive.asl.json
      DefinitionSubstitutions:
        FunctionArn: !GetAtt CalculateFunction.Arn
        StateMachineArn: !Sub "arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachineName}"
      Role: !GetAtt StateMachineRole.Arn

  StateMachineRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: states.amazonaws.com
            Action: sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: CloudWatchLogsDeliveryFullAccessPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogDelivery
                  - logs:GetLogDelivery
                  - logs:UpdateLogDelivery
                  - logs:DeleteLogDelivery
                  - logs:ListLogDeliveries
                  - logs:PutResourcePolicy
                  - logs:DescribeResourcePolicies
                  - logs:DescribeLogGroups
                Resource: "*"
        - PolicyName: LambdaInvokePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                Resource:
                  - !GetAtt CalculateFunction.Arn
        - PolicyName: StepFunctionsStartExecutionPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - states:StartExecution
                  - states:DescribeExecution
                  - states:StopExecution
                Resource:
                  # StateMachineと循環参照になるためNG
                  # - !GetAtt StateMachine.Arn
                  - !Sub "arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachineName}"
              - Effect: Allow
                Action:
                  - events:PutTargets
                  - events:PutRule
                  - events:DescribeRule
                Resource:
                  - !Sub "arn:aws:events:${AWS::Region}:${AWS::AccountId}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule"

  CalculateFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/
      Handler: calculate.lambda_handler
      Runtime: python3.12
      Architectures:
        - arm64

  FunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub "/aws/lambda/${CalculateFunction}"
      RetentionInDays: 7

Outputs:
  StateMachineArn:
    Description: "State machine ARN"
    Value: !Ref StateMachine
  StateMachineRoleArn:
    Description: "IAM Role created for State machine"
    Value: !GetAtt StateMachineRole.Arn
工作クラブ

工作クラブ

記事一覧

「マーケライズ工作クラブ」で役に立つものを作り、その過程で新しい技術を習得してスキルアップしていきましょう。