Time to Run with AWS
Overview
As a runner, I’ve always enjoyed the pursuit of pacing faster today relative to yesterday. With the advent of apps like Strava that track your performance (among many other metrics), it’s easy to measure if you are running faster over time and, perhaps more importantly, which factors affect your run pace. Indeed, based on my historical running data, I’ve noticed two factors that moderate my run times: time of day and weather. My fastest runs usually happen between 12 PM - 7 PM, and slower runs occurred with high winds, cold weather (less than 30°F ), hot weather (greater than 90°F), or rain (being wet makes me miserable). On these “bad weather” days, I’d prefer to run inside on the treadmill and wait until more optimal running conditions.
With these criteria in mind, I would begin most mornings by deciding if it was an “inside” or “outside” running day by executing the following mental steps:
☁️ Log on to weather.com at 7AM
☁️ Check the hourly forecast between 12PM and 5PM
☁️ Check the temperature, wind-speed, and chance of precipitation
☁️ Make a “Yes” or “No” decision to run outside based on the forecast
While it isn’t a huge inconvenience to repeat these steps each day, it required a few minutes of the morning. Perhaps more importantly, though, it was yet another decision that needed attention. I make lots of decisions in a day, and each decision requires thought and energy. Thus, if I could automate one of those decisions by creating a “rules engine,” it would save me the time and cognition required to plan my daily run.
The journey of automating this process is what inspired the following post, which will cover a few key concepts, including:
- Scheduling a workflow using AWS Event Bridge
- Building Lambda functions
- Sending emails via AWS SES
These concepts can be generalized to any reoccurring process. Perhaps it’s a daily forecast that planners use to manage a store’s inventory. Or maybe it’s a marketing email sent to new customers after making their first purchase. Extracting data from a database/API, applying some business logic, and then socializing the results through an Email is a standard analytics workflow. Read on to learn more about how I leveraged this approach to identify the best times to run each day, save myself a few minutes, and remove one more decision from my day.
Be The Algorithm
Before diving into the solution, I wanted to quickly discuss a technique I’ve found helpful when automating a decision-making process. Before writing code or building queries, it’s good to step through the process manually that you are trying to automate. That is, you want to assume the role of the computer or algorithm. Repeating the steps above each day made it clear how I would automate the decision-making process by identifying:
- The information I needed to make a decision
- The timing and frequency of the decision
- The values of the criteria that would lead to an “inside” or “outside” run decision
You could easily sub in a machine learning model to discover the rules, but the overall process flow will be essentially unchanged. Keep this in mind next time you go to create a process that automates a decision.
In the next section, we’ll cover the technical specifics.
Architecture Overview
No post on AWS would be complete without a diagram outlining how data flows through our system. Accordingly, the figure above depicts the order of operations and each service’s role in our decision workflow. Each service is described in greater detail below.
Event Bridge - this is our scheduler. Each day at Noon PST, Amazon Event Bridge initiates the first Lambda function (TimeToRun).
Lambda (TimeToRun) - TimeToRun connects to the OpenWeather API, extracts weather forecasts for my latitude and longitude, and formats the resulting data. The forecasts are then saved to an S3 bucket.
Lambda (SendRunningEmail) - SendRunningEmail is triggered by any action in the S3 bucket containing the hourly weather forecasts. In this case, when a new object lands in the bucket, the Lambda function automatically starts and retrieves the data from the S3 bucket.
Amazon SES - While this service is part of the SendRunningEmail Lambda, I separated it since it’s such a helpful service. Sending emails through Python can be tricky, and I’ve found the approach using AWS SES to be much easier. You import the service, define the message contents, add a bit of HTML (to make it look pretty, of course), and send the message to a set of desired email addresses. It’s that simple.
Personal Gmail - this is where the resulting message lands, alerting me if it is an “inside” or “outside” running day.
In the following sections, we’ll cover the two Lambda functions that comprise this workflow. We’ll also cover a few “gotchas” that come up frequently when working with Lambda for the first time.
Part 1: TimeToRun
The first part will cover the data collection process, which briefly entails:
1. Scheduling
2. Extracting hourly forecasts from OpenWeather API
3. Saving the forecasts to an S3 bucket
We’ll use EventBridge for scheduling, which you can see in the diagram on the left below.
To connect Lambda with EventBridge, you add a trigger and then indicate how frequently you want it to execute. The desired cadence for the hourly weather forecasts was every weekday at 7 PM GMT (or noon PST), expressed via Cron below.
Now that we’ve scheduled our Lambda function, the next step is to add logic that collects the forecasts and saves them to S3.
import os
import sys
from typing import List
import json
from datetime import datetime
import logging
import pytz
import requests
import boto3
S3_BUCKET = "running-weather-data"
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def retrieve_weather_data(units_of_measure: str) -> dict:
api_key = os.environ["WEATHER_API_KEY"]
lat = os.environ["LOCATION_LATITUDE"]
lon = os.environ["LOCATION_LONGITUDE"]
base_url = "https://api.openweathermap.org/data/2.5/onecall?"
url = f"{base_url}lat={lat}&lon={lon}&appid={api_key}&units={units_of_measure}"
response = requests.get(url)
weather_data = json.loads(response.text)
return weather_data
def parse_weather_data(weather_hour: dict) -> dict:
hour = datetime.fromtimestamp(
weather_hour["dt"], pytz.timezone("America/Los_Angeles")
).hour
temp = weather_hour["temp"]
wind_speed = weather_hour["wind_speed"]
weather_status = weather_hour["weather"][0]
status = weather_status["main"]
return {"hour": hour, "temp": temp, "wind_speed": wind_speed, "status": status}
def is_today_weather(weather_hour: dict, timezone: str = "America/Los_Angeles") -> bool:
weather_fmt = "%Y-%m-%d"
today_dt = datetime.now().strftime(weather_fmt)
weather_dt = datetime.fromtimestamp(weather_hour["dt"], pytz.timezone(timezone))
if weather_dt.strftime(weather_fmt) == today_dt:
return True
else:
return False
def _generate_s3_path() -> str:
year, month, day = datetime.now().strftime("%Y-%m-%d").split("-")
s3_path = f"data/{year}-{month}-{day}-running-times.json"
return s3_path
def save_json_to_s3(json_data: dict, s3_bucket: str) -> None:
s3 = boto3.resource("s3")
response = s3.Object(s3_bucket, _generate_s3_path()).put(
Body=(bytes(json.dumps(json_data).encode("UTF-8")))
)
if response.get("HTTPStatusCode") == 200:
print(f"Data successfully landed")
def lambda_handler(event, context):
try:
# retrieve weather forecast from OpenWeatherAPI
weather_data = retrieve_weather_data(units_of_measure="imperial")
# extract hourly forecast
hourly_data = weather_data["hourly"]
# filter to only today's forecast
today_weather_bool = [is_today_weather(x) for x in hourly_data if x]
# extract fields relevant to deciding if run outside
hourly_data = [parse_weather_data(x) for x in hourly_data]
# filter to today's hourly data
today_hourly_data = [
today_weather
for (today_weather, is_today) in zip(hourly_data, today_weather_bool)
if is_today
]
# convert all data to dictionary
hourly_data_dict = {"weather_data": today_hourly_data}
# save hourly weather data to S3 Bucket as .json
save_json_to_s3(json_data=json.dumps(hourly_data_dict), s3_bucket=S3_BUCKET)
return {"statusCode": 200, "body": json.dumps(hourly_data_dict)}
except Exception as exp:
exception_type, exception_value, exception_traceback = sys.exc_info()
err_msg = json.dumps(
{"errorType": exception_type.__name__, "errorMessage": str(exception_value)}
)
logger.error(err_msg)
This entire block of code is triggered daily, landing a single .json file in the desired S3 Bucket. While this looks straightforward, it’s not as simple as copy-pasting your code and hitting play. Like most things in the AWS ecosystem, getting everything to work takes a few tries. The subsections below highlight areas that are potential sources of confusion when starting with Lambda.
Configuring Environment Variables
Environment variables store sensitive information, such as API keys, passwords, or other private values. In this case, I’ve stored the OpenWeather API key and Latitude/Longitude of where I want the daily forecasts. The image below depicts how to add these variables via the console.
And this is where these variables are accessed in the Lambda code.
def retrieve_weather_data(units_of_measure: str) -> dict:
api_key = os.environ["WEATHER_API_KEY"]
lat = os.environ["LOCATION_LATITUDE"]
lon = os.environ["LOCATION_LONGITUDE"]
base_url = "https://api.openweathermap.org/data/2.5/onecall?"
url = f"{base_url}lat={lat}&lon={lon}&appid={api_key}&units={units_of_measure}"
response = requests.get(url)
weather_data = json.loads(response.text)
return weather_data
Note that this approach to managing keys and constants is sufficient for smaller projects and prototypes. However, for larger projects where you are collaborating with other developers and stakeholders, configuration data will likely be stored in a way that allows for versioning and tracking.
Adding Layers
A layer is a .zip file that includes additional code or data. If you noticed in the retrieve_weather_data
function, we use the requests package to access the OpenWeather API. Requests is not part of the Python Standard Library, so we must include it as part of a layer (there is no way to pip install requests
or any other third-party libraries). While a full explanation of adding a layer is beyond the scope of this post, the following article nicely summarizes how to incorporate third-party libraries on Lambda.
Adding Permissions
Any time you set up a service through AWS, the default is to have minimal permissions in place. Among other activities, permissions allow your lambda function to interact with other AWS services. For example, the TimeToRun Lambda function writes the weather forecasts to an S3 bucket. The ability to interact with S3 is not setup by default, so you’ll have to attach a policy. Below I’ve enabled AmazonS3FullAccess, which allows access to S3. You’ll need to do the same for the second Lambda function as well.
If you ever receive an error message like “…is not authorized to perform…”, it usually can be solved by updating the permissions for a given service.
Run Time Limit
A second default setting that might not be immediately obvious is the standard run-time limit. This setting indicates how long AWS will let a Lambda run before terminating. The default is set to three seconds. Depending on the processing time, I’ll usually increase the limit to 30 seconds and then gradually go down or up from there. The image below indicates where you can adjust the run-time or memory for more compute-heavy tasks.
If you’ve successfully implemented all of the steps above, you should receive something that looks like this when testing the function:
This response indicates that everything ran smoothly and you are ready for the next section!
Part 2: SendRunningEmail Lambda
The second part of this post covers the data formatting and transmission process in four steps:
- Extract data from S3
- Determine if “inside” or “outside” running day
- Format the decision text (so it looks nice, of course)
- Send the decision to the desired email address(es)
import sys
import boto3
import json
import logging
from datetime import datetime
from typing import List
S3_BUCKET = "<weather-data-bucket-name>"
SENDER = "<sender-email-address>"
RECIPIENT = "<recipient-email-addresses>"
AWS_REGION = "us-west-2"
SUBJECT = "Best Times to Run Today"
CHARSET = "UTF-8"
RUNNING_CONDS = {
"hour": {"min_hour": 13, "max_hour": 19},
"status": ["Rain", "Snow", "Smoke"],
"wind_speed": {"min_speed": 0, "max_speed": 30},
"temp": {"min_temp": 30, "max_temp": 90},
}
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def find_most_recent_data_path(s3_bucket: str) -> str:
today_dt = datetime.now().strftime("%Y-%m-%d")
s3 = boto3.resource("s3")
bucket = s3.Bucket(s3_bucket)
existing_data = [
x.key
for x in bucket.objects.all()
if str(x.key).startswith("data") and str(x.key).endswith("-running-times.json")
]
most_recent_dt = max(
[x.split("/")[-1].replace("-running-times.json", "") for x in existing_data]
)
assert most_recent_dt == today_dt, "No Data Found for Today's Date"
s3_key = [x for x in existing_data if most_recent_dt in x][0]
return s3_key
def read_json_from_s3(s3_bucket: str, s3_key: str) -> str:
s3 = boto3.resource("s3")
obj = s3.Object(s3_bucket, s3_key)
file_content = obj.get()["Body"].read().decode("utf-8")
json_content = json.loads(file_content)
return json_content
def _convert_to_12hr_format(hr: int) -> str:
return datetime.strptime(str(hr), "%H").strftime("%I:%M %p").strip("0")
def format_run_times(run_times: List[dict]) -> str:
if run_times:
hour_fmt = [
f"<b>{_convert_to_12hr_format(x.get('hour'))}:</b>" for x in run_times
]
temp_fmt = [f"{round(x.get('temp'))}F with" for x in run_times]
wind_speed_fmt = [
f"wind at {round(x.get('wind_speed'))} mph" for x in run_times
]
status_fmt = [f"and {x.get('status').lower()}" for x in run_times]
fmt_msg = zip(hour_fmt, temp_fmt, wind_speed_fmt, status_fmt)
fmt_msg_list = [" ".join(x) for x in fmt_msg]
return fmt_msg_list
else:
return ["No Times to Run Today!"]
def is_time_for_run(weather_hour: dict) -> bool:
is_time = (
RUNNING_CONDS["hour"]["min_hour"]
<= weather_hour["hour"]
<= RUNNING_CONDS["hour"]["max_hour"]
)
is_temp = (
RUNNING_CONDS["temp"]["min_temp"]
<= weather_hour["temp"]
<= RUNNING_CONDS["temp"]["max_temp"]
)
is_wind = (
RUNNING_CONDS["wind_speed"]["min_speed"]
<= weather_hour["wind_speed"]
<= RUNNING_CONDS["wind_speed"]["max_speed"]
)
is_status = weather_hour["status"] not in RUNNING_CONDS["status"]
if all([is_time, is_temp, is_wind, is_status]):
return True
else:
return False
def lambda_handler(event, context):
# generate s3 key for most recent weather data
running_data_path = find_most_recent_data_path(s3_bucket=S3_BUCKET)
# read as str
hourly_weather_data = read_json_from_s3(
s3_bucket=S3_BUCKET, s3_key=running_data_path
)
# convert to dict and extract weather data
hourly_data_dict = eval(hourly_weather_data)["weather_data"]
# True or False filter based on hour, temperature, windspeed criteria
run_time_bool = [is_time_for_run(x) for x in hourly_data_dict]
# applies weather criteria and filters only to hours where critera are met
run_times = [
time
for (time, time_to_run) in zip(hourly_data_dict, run_time_bool)
if time_to_run
]
# beautify the message with a sprinkle of HTML
running_msg_lst = format_run_times(run_times)
running_msg_str = "<p>" + "<br/>".join(running_msg_lst) + "</p>"
running_msg = f"""<html>
<head></head>
<body>
<h1>Best Times to Run</h1>
{running_msg_str}
</body>
</html>
"""
try:
client = boto3.client("ses", region_name=AWS_REGION)
response = client.send_email(
Destination={
"ToAddresses": [
RECIPIENT,
]
},
Message={
"Body": {
"Html": {
"Charset": CHARSET,
"Data": running_msg,
},
},
"Subject": {
"Charset": CHARSET,
"Data": SUBJECT,
},
},
Source=SENDER,
)
except Exception as exp:
exception_type, exception_value, exception_traceback = sys.exc_info()
err_msg = json.dumps(
{"errorType": exception_type.__name__, "errorMessage": str(exception_value)}
)
logger.error(err_msg)
Most of the logic is concerned with accessing and formatting the data we collected in the first part. However, this is where we determine an inside or outside run. The two sections highlighted below are responsible for making this decision.
RUNNING_CONDS = {
"hour": {"min_hour": 13, "max_hour": 19},
"status": ["Rain", "Snow", "Smoke"],
"wind_speed": {"min_speed": 0, "max_speed": 30},
"temp": {"min_temp": 30, "max_temp": 90},
}
These are all of the criteria - time, status (I probably don’t want to run if there’s a 🔥wildfire smoke 🔥), wind speed, and temperature - and their limits used in making the running decision. The is_time_for_run
function ensures that the forecast data satisfies all four conditions.
def is_time_for_run(weather_hour: dict) -> bool:
is_time = (
RUNNING_CONDS["hour"]["min_hour"]
<= weather_hour["hour"]
<= RUNNING_CONDS["hour"]["max_hour"]
)
is_temp = (
RUNNING_CONDS["temp"]["min_temp"]
<= weather_hour["temp"]
<= RUNNING_CONDS["temp"]["max_temp"]
)
is_wind = (
RUNNING_CONDS["wind_speed"]["min_speed"]
<= weather_hour["wind_speed"]
<= RUNNING_CONDS["wind_speed"]["max_speed"]
)
is_status = weather_hour["status"] not in RUNNING_CONDS["status"]
if all([is_time, is_temp, is_wind, is_status]):
return True
else:
return False
I’ll receive a message (like the one below) in my inbox every weekday at Noon when these conditions are met.
Otherwise, I’ll receive the message below:
Overall, it looks like a solid day for a run. The one thing to note is that 3 PM and 4 PM do not have any information. The absence of data at these times indicates that at least one of the criteria was not met. Indeed, the local weather forecast showed rain for those times, so they were automatically filtered out in the message, leaving only times that met all four criteria. Portland, Oregon (my home) is a rainy place, and this sort of granular information is beneficial for those days where you get a brief window of dryness to go run.
Parting Thoughts
I hope this was a helpful introduction to setting up and running a basic Lambda workflow. It is a useful service, and I’ve found numerous applications in my day-to-day life beyond just helping me plan my daily run. Please feel free to comment below if you have any thoughts or questions. Until next time, happy coding!