Using Python to access Amazon Simple Workflow (SWF)

Posted by Edgar Roman on

Purpose

I started experimenting using Python and boto to start communicating with Amazon Web Services' new Simple Workflow (SWF). So I wanted to share my experiences and provide examples. Note that support for SWF in boto is still in beta at this time of this writing.

The goal is to illustrate the concepts to leverage an extremely simple workflow that uploads a file to an S3 bucket

Setup

For this exercise, I wanted a very basic workflow of a single activity: to upload a file to an S3 bucket. In order to do this, there are a number of other things that need to be set up:

  1. Log into the AWS console and navigate to SWF
  2. Create a new domain called 'UploadTest'
  3. Register a new Workflow (Workflow Types --> Workflow Actions --> Register New)
    • Workflow Type Name: FileIngest
    • Workflow Type Version: 1
    • Default Task List: MainTaskList
    • Default Execution Start to Close Timeout: 1 hour
    • Default Task Start to Close Timeout: 1 hour
  4. Register a new Activity (Activity Types --> Activity Actions --> Register New)
    • Activity Type Name: UploadToS3Archive
    • Activity Type Version: 1
    • Task List: NewFiles
  5. Generate an IAM account and get the id and secret
    • Restrict the permissions of the user to only allow SWF access:
    • {
      	"Version":"2008-10-17",
      	"Statement":[{
      			"Effect":"Allow",
      			"Action":["swf:*"
      			],
      			"Resource":["*"
      			]
      		}
      	]
      }
  6. Start coding!

Now you can use the python code below as a guide to start using your workflow

Code

Register a workflow execution

This step essentially kicks off the execution (I like to think of them as jobs) and the whole processess starts. At the end of this step, there will be a new execution waiting for a decision.

import boto.swf
from boto.swf.layer1_decisions import Layer1Decisions
import random
import json

aws_id = ''
aws_secret = ''

domain = 'UploadTest'
workflow_type = 'FileIngest'
workflow_version = '1'
default_task_list = 'MainTaskList'
task_body = '{ "file" : "testfile", "url" : "http://staging.pbs.org/uploads/sherlock.mp4" }'
#------------------------------------------------------------------
# Establish connection
wf = boto.swf.layer1.Layer1(aws_access_key_id=aws_id,aws_secret_access_key=aws_secret)
#------------------------------------------------------------------
# Generate a workflow id. This will be the 'name' of the execution
# throughout the whole process. So attached to this workflow id will
# be all the decisions, activities, and history until the execution
# is completed. This is akin to the name of a job
workflow_id = 'job-%s' % int(random.random() * 10000)
print '===> starting workflow: %s' % workflow_id
# This variable specifies the starting task list for the execution.
# So in this case, we want this execution to start in the 'MainTaskList'
# queue
out = wf.start_workflow_execution(domain, # Domain
workflow_id, # workflow id
workflow_type, # Workflow name
workflow_version, # Workflow version
default_task_list, # Task List
input=task_body)
print '===> start_workflow_execution returned %s' % out

Make a Decision

Query for a Decision and Respond with a decision. This must be done for all new executions to determine what the first activity should be

#------------------------------------------------------------------
# Poll for Decision Task
# Ask SWF if there are any decisions that need to be made for executions (jobs)
# in the start task list
print '===> polling for decision...'
decision = wf.poll_for_decision_task(domain, default_task_list)
print '===> got decision: \n%s' % json.dumps(decision, sort_keys=True, indent=4)
# extract token. We'll need this to communicate to SWF after we've
# made our decisions
token = decision['taskToken']
print '===> token = %s' % token

#------------------------------------------------------------------
# Insert your own code to determine what activity comes next.
# This could mean that you walk through the execution
# history to see what the last activity was and then pick the next
#
# In this simple example, we don't have any real decision to make -
# we just need to create a activity so a worker can upload the file
# to the S3 Archive
activity_id='upload-task-%s' % int(random.random() * 10000)
activity_type = 'UploadToS3Archive'
print '===> scheduling task: %s' % activity_id
d = Layer1Decisions()
d.schedule_activity_task(activity_id, # Activity ID
activity_type, # Activity Type
'1', # Activity Type Version
None, # Task List(use default)
'control data', # control
'300', # Heartbeat in seconds
'300', # schedule_to_close_timeout
'300', # schedule_to_start_timeout
None, # start_to_close_timeout
'name of bucket to upload') # extra data to pass to activity
#------------------------------------------------------------------
# Complete Decision Task
# - easy enough
out = wf.respond_decision_task_completed(token,d._data)
print '===> respond_decision_task_completed returned %s' % out
#------------------------------------------------------------------

Do the Activity

Query for an Activity Task and Respond with completion. This is where the work will get done

#------------------------------------------------------------------
# Poll for Activity Task
# Ask SWF if there are any activites that need to be made for executions (jobs)
# that need files uploaded to S3
task_list_for_s3upload = 'NewFiles'
print '===> polling for activity...'
activity_task = wf.poll_for_activity_task(domain, task_list_for_s3upload)
print '===> got activity: \n%s' % json.dumps(activity_task, sort_keys=True, indent=4)
token = activity_task['taskToken']

#------------------------------------------------------------------
#
#
# Do the actual activity task here like upload the file to S3
# - We actually find the name of the bucket in the payload of
# the activity
#

#------------------------------------------------------------------
# Complete Activity Task
task_status = 'All done - thanks for coming'
out = wf.respond_activity_task_completed(token,task_status)
print '==> respond_activity_task_completed returned %s' % out

Close the execution

Finally, we have to poll for another decision, discover that the activity completed successfully, and then tell SWF to close the execution. The code that polls for a decision is identical to the snippet above, but the logic for the flow would be different, concluding that we are finished.

#------------------------------------------------------------------
# Poll for Decision Task
# Ask SWF if there are any decisions that need to be made for executions (jobs)
# in the start task list
print '===> polling for decision...'
decision = wf.poll_for_decision_task(domain, default_task_list)
print '===> got decision: \n%s' % json.dumps(decision, sort_keys=True, indent=4)
# extract token.  We'll need this to communicate to SWF after we've
# made our decisions
token = decision['taskToken']
print '===> token = %s' % token

d = Layer1Decisions()
d.complete_workflow_execution()
#------------------------------------------------------------------ # Complete Decision Task out = wf.respond_decision_task_completed(token,d._data) print '===> respond_decision_task_completed returned %s' % out
 

Sample Output

Here is the sample output from the script above

python swf-test.py
===> starting workflow: job-2444
===> start_workflow_execution returned {u'runId': u'646ef96c-21db-4195-9c62-e33993c9e284'}
===> polling for decision...
===> got decision:
{
    "events": [
        {
            "eventId": 1,
            "eventTimestamp": 1339460909.346,
            "eventType": "WorkflowExecutionStarted",
            "workflowExecutionStartedEventAttributes": {
                "childPolicy": "TERMINATE",
                "executionStartToCloseTimeout": "3600",
                "input": "{ \"file\" : \"testfile\", \"url\"  : \"http://staging.pbs.org/uploads/sherlock.mp4\" }",
                "parentInitiatedEventId": 0,
                "taskList": {
                    "name": "MainTaskList"
                },
                "taskStartToCloseTimeout": "3600",
                "workflowType": {
                    "name": "FileIngest",
                    "version": "1"
                }
            }
        },
        {
            "decisionTaskScheduledEventAttributes": {
                "startToCloseTimeout": "3600",
                "taskList": {
                    "name": "MainTaskList"
                }
            },
            "eventId": 2,
            "eventTimestamp": 1339460909.346,
            "eventType": "DecisionTaskScheduled"
        },
        {
            "decisionTaskStartedEventAttributes": {
                "scheduledEventId": 2
            },
            "eventId": 3,
            "eventTimestamp": 1339460909.478,
            "eventType": "DecisionTaskStarted"
        }
    ],
    "previousStartedEventId": 0,
    "startedEventId": 3,
    "taskToken": "AAAAKgAAAAEAAAAAAAAAAQT2BHgH6Tsy3pxbCESvJerq3DnuogS+8AmtIRVD/eMnCvDwLbZGRRpHwa8vV/vmrmHd7qeLid/FpxlB8886kUxROuz5y893j5KNHQ0DUjpCvuSs
87uoA4Iv1KhoHo4jMrSLhsXgdQ3Ey8Ai4r2X0e/yUBgBhXvQrQYNrHJdy6nBYUE/flZ9jluyUYB7lkMeRpfpgq1kZF7z6sUNFAeyOZTuWwNYiTAEiWeE9iqum+w5z973xVB7HPCygV1oQfUqRChsFm
FTOlTgxuYqWlyE6Xg=",
    "workflowExecution": {
        "runId": "646ef96c-21db-4195-9c62-e33993c9e284",
        "workflowId": "job-2444"
    },
    "workflowType": {
        "name": "FileIngest",
        "version": "1"
    }
}
===> token = AAAAKgAAAAEAAAAAAAAAAQT2BHgH6Tsy3pxbCESvJerq3DnuogS+8AmtIRVD/eMnCvDwLbZGRRpHwa8vV/vmrmHd7qeLid/FpxlB8886kUxROuz5y893j5KNHQ0DUjpCvuSs87uoA
4Iv1KhoHo4jMrSLhsXgdQ3Ey8Ai4r2X0e/yUBgBhXvQrQYNrHJdy6nBYUE/flZ9jluyUYB7lkMeRpfpgq1kZF7z6sUNFAeyOZTuWwNYiTAEiWeE9iqum+w5z973xVB7HPCygV1oQfUqRChsFmFTOlT
gxuYqWlyE6Xg=
===> scheduling task: upload-task-5040
===> respond_decision_task_completed returned None
===> polling for activity...
===> got activity:
{
    "activityId": "upload-task-5040",
    "activityType": {
        "name": "UploadToS3Archive",
        "version": "1"
    },
    "input": "name of bucket to upload",
    "startedEventId": 6,
    "taskToken": "AAAAKgAAAAEAAAAAAAAAAWUE8JahyprC8o52pkhvexLgR0U9BS4gARhLfzW5sB9zxLl/hu/I3rJ5tIsvsJM8hPE8CcbrnI9CxjlDTEc5yWCNeA3g3KsZo1qTH69H7p5BTz3p
eW5aGk6To5Faz+hmEl+hnOGY8oor4S0VQVYm7Dx4djzOxfkwj4nHUv/Vm1UhU7IaN0VVc4RRCACBfEojZYWUKCMIwOn72RkLMHirROiKLKyVtvKaD95bBIObqA0YsaEbtMiIoFjhF2J4WVcXPvdS4e
qAUPlkfEHuuY4GRmE=",
    "workflowExecution": {
        "runId": "646ef96c-21db-4195-9c62-e33993c9e284",
        "workflowId": "job-2444"
    }
}
==> respond_activity_task_completed returned None
===> polling for decision...
===> got decision:
{
    "events": [
        {
            "eventId": 1,
            "eventTimestamp": 1339460909.346,
            "eventType": "WorkflowExecutionStarted",
            "workflowExecutionStartedEventAttributes": {
                "childPolicy": "TERMINATE",
                "executionStartToCloseTimeout": "3600",
                "input": "{ \"file\" : \"testfile\", \"url\"  : \"http://staging.pbs.org/uploads/sherlock.mp4\" }",
                "parentInitiatedEventId": 0,
                "taskList": {
                    "name": "MainTaskList"
                },
                "taskStartToCloseTimeout": "3600",
                "workflowType": {
                    "name": "FileIngest",
                    "version": "1"
                }
            }
        },
        {
            "decisionTaskScheduledEventAttributes": {
                "startToCloseTimeout": "3600",
                "taskList": {
                    "name": "MainTaskList"
                }
            },
            "eventId": 2,
            "eventTimestamp": 1339460909.346,
            "eventType": "DecisionTaskScheduled"
        },
        {
            "decisionTaskStartedEventAttributes": {
                "scheduledEventId": 2
            },
            "eventId": 3,
            "eventTimestamp": 1339460909.478,
            "eventType": "DecisionTaskStarted"
        },
        {
            "decisionTaskCompletedEventAttributes": {
                "scheduledEventId": 2,
                "startedEventId": 3
            },
            "eventId": 4,
            "eventTimestamp": 1339460909.683,
            "eventType": "DecisionTaskCompleted"
        },
        {
            "activityTaskScheduledEventAttributes": {
                "activityId": "upload-task-5040",
                "activityType": {
                    "name": "UploadToS3Archive",
                    "version": "1"
                },
                "control": "control data",
                "decisionTaskCompletedEventId": 4,
                "heartbeatTimeout": "300",
                "input": "name of bucket to upload",
                "scheduleToCloseTimeout": "300",
                "scheduleToStartTimeout": "300",
                "startToCloseTimeout": "21600",
                "taskList": {
                    "name": "NewFiles"
                }
            },
            "eventId": 5,
            "eventTimestamp": 1339460909.683,
            "eventType": "ActivityTaskScheduled"
        },
        {
            "activityTaskStartedEventAttributes": {
                "scheduledEventId": 5
            },
            "eventId": 6,
            "eventTimestamp": 1339460909.891,
            "eventType": "ActivityTaskStarted"
        },
        {
            "activityTaskCompletedEventAttributes": {
                "result": "All done - thanks for coming",
                "scheduledEventId": 5,
                "startedEventId": 6
            },
            "eventId": 7,
            "eventTimestamp": 1339460909.992,
            "eventType": "ActivityTaskCompleted"
        },
        {
            "decisionTaskScheduledEventAttributes": {
                "startToCloseTimeout": "3600",
                "taskList": {
                    "name": "MainTaskList"
                }
            },
            "eventId": 8,
            "eventTimestamp": 1339460909.992,
            "eventType": "DecisionTaskScheduled"
        },
        {
            "decisionTaskStartedEventAttributes": {
                "scheduledEventId": 8
            },
            "eventId": 9,
            "eventTimestamp": 1339460910.117,
            "eventType": "DecisionTaskStarted"
        }
    ],
    "previousStartedEventId": 3,
    "startedEventId": 9,
    "taskToken": "AAAAKgAAAAEAAAAAAAAAAdiH6rjqEKrBimSWRvzosCyMdfZNDBUbN+kd+3JDP99j386L416hs/iZwU1fxf0gl9WlXhp6mw5VAfBUw/kGRVSN2fp+D94GuFSbFA1VCkzJY0/Z
zPGJ2nYIFaRq6SpK2AjD1D4CgcZ1K05XhQfEoq/BX9CVQu60Q0Qk2eQtVhjdAR7AJFgNERioJDz0V9S+1CdaY8KZbcgu2bJYEmrR9wI26qUVfwjaiMQfe1QZBTWc/ofcUm+CNtUdOd++hLp/xy1OuK
XeneyIHrta238b6rU=",
    "workflowExecution": {
        "runId": "646ef96c-21db-4195-9c62-e33993c9e284",
        "workflowId": "job-2444"
    },
    "workflowType": {
        "name": "FileIngest",
        "version": "1"
    }
}
===> token = AAAAKgAAAAEAAAAAAAAAAdiH6rjqEKrBimSWRvzosCyMdfZNDBUbN+kd+3JDP99j386L416hs/iZwU1fxf0gl9WlXhp6mw5VAfBUw/kGRVSN2fp+D94GuFSbFA1VCkzJY0/ZzPGJ2
nYIFaRq6SpK2AjD1D4CgcZ1K05XhQfEoq/BX9CVQu60Q0Qk2eQtVhjdAR7AJFgNERioJDz0V9S+1CdaY8KZbcgu2bJYEmrR9wI26qUVfwjaiMQfe1QZBTWc/ofcUm+CNtUdOd++hLp/xy1OuKXeney
IHrta238b6rU=