Deploy and Visualize Machine Learning Model in a Cost-Effective Manner

Original article was published by Ryan Lui on Artificial Intelligence on Medium


Build, Train and Tune Machine Learning Model: Scikit-Learn is employed under AWS SageMaker to build and train the gold price prediction model. AWS SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy machine learning (ML) models quickly. SageMaker removes the heavy lifting from each step of the machine learning process to make it easier to develop high quality models. For details, you may refer to an article I had published earlier.

(Lambda 1) Deploy Machine Learning Model with Lambda: AWS Lambda is one of the key components for AWS Serverless Platform, It lets you run code by trigger without provisioning or managing servers. In the backend, I heavily use of AWS Lambda to prepare all the data to facilitate the web application.

In the web application I have built, the gold price prediction is only required once a day. A more cost-effective way is to build a AWS Lambda function to deploy a trained Scikit-Learn model, stored in AWS S3, by creating Amazon SageMaker endpoint once a day and the endpoint will be deleted after all required tasks are completed.

Though AWS Lambda is one of the key components, handling of external libraries can be tricky and the 50MB size constraint of the zipped code makes thing a bit complicated. In my web application, AWS Lambda application, a combination of Lambda functions, event sources, and other resources that work together to perform tasks, is developed with AWS Cloud9.

In the AWS Cloud9 terminal, run this command to install required modules. After you install the modules, they populate the parent folder of your Lambda function.

python -m pip install --target=./ ModuleName

Now it is ready to do the coding to deploy machine learning module with the Scikit-Learn trained model and configurations stored in AWS S3. The code of the Lambda function below is simply deploying a machine learning model as a AWS SageMaker endpoint for further gold price prediction purpose.

from sagemaker.sklearn.model import SKLearnModel

EXECUTION_ROLE = 'YOUR_EXECUTION_ROLE'
FRAMEWORK_VERSION = '0.23-1'
artifact = 'S3_LOCATION_FOR_SCIKIT_MODEL'
def create_endpoint(endpoint_name, config_name):
try:
model = SKLearnModel(model_data=artifact, role=EXECUTION_ROLE, entry_point='script_model.py', framework_version=FRAMEWORK_VERSION)
predictor = model.deploy(endpoint_name=endpoint_name, instance_type='ml.c5.large', initial_instance_count=1)
return 'Success'
except Exception as e:
print(e)
raise(e)
def lambda_handler(event, context):
endpoint_name = 'NAME_OF_YOUR_ENDPOINT'
config_name = 'ENDPOINT_CONFIG_IN_SAGEMAKER'
create_endpoint(endpoint_name, config_name)

Once the AWS Lambda function is published through AWS Cloud9, a trigger has to be created to invoke the function. Create an EventBridge rule through Amazon EventBridge console that triggers on daily basis in my case, specify a cron expression that defines when the AWS Lambda function is to be triggered.

The following cron expression is used in my application to create a AWS Sagemaker endpoint daily at 05:00 am (UTC) for gold prediction which is handled by another AWS Lambda function.

cron(0 05 ? * MON-SAT *)

(Lambda 2) Perform Prediction with the Newly Created Endpoint: Now we are going to develop another AWS Lambda function to perform the gold price prediction using the AWS SageMaker endpoint created by the previous AWS Lambda function.

Again AWS Cloud9 environment is used to deploy the AWS Lambda function. First of all, retrieve current day gold price and US dollar index from yahoofinancials module, then use SKLearnPredictor to do the prediction and put the prediction data in Amazon DynamoDB, finally delete the AWS SageMaker endpoint. The features used for the prediction including 20, 50 and 200 days moving average of gold price and US dollar index. And the predicted value will be stored in AWS DynamoDB.

#import all required libraries such as csv, numpy...
import boto3
from yahoofinancials import YahooFinancials
from datetime import datetime, timezone, timedelta
from sagemaker.sklearn.model import SKLearnModel, SKLearnPredictor
ticker = ["GC=F", "DX-Y.NYB"]
names = ["Gold", "USDX"]
sagemaker = boto3.client('sagemaker')
runtime= boto3.client('runtime.sagemaker')
dydb = boto3.client('dynamodb')
EXECUTION_ROLE = 'YOUR_EXECUTION_ROLE'
FRAMEWORK_VERSION = '0.23-1'
artifact = 'SCIKIT_MODEL_IN_S3'
bucket = 'S3_BUCKET'
endpoint_name = 'SAGEMAKER_ENDPOINT_NAME'
config_name = 'SAGEMAKER_ENDPOINT_CONFIG'
table_name = 'DYNAMODB_TABLE_NAME'
#get the current day gold price
def get_yf_data(start_date, end_date):
date_range = pd.bdate_range(start=start_date,end=end_date)
values = pd.DataFrame({ 'Date': date_range})
values['Date']= pd.to_datetime(values['Date'])

for i in ticker:
raw_data = YahooFinancials(i)
raw_data = raw_data.get_historical_price_data(start_date, end_date, "daily")
df = pd.DataFrame(raw_data[i]['prices'])[['formatted_date','adjclose']]
df.columns = ['Date1', names[ticker.index(i)]]
df['Date1']= pd.to_datetime(df['Date1'])
values = values.merge(df,how='left',left_on='Date',right_on='Date1')
values = values.drop(labels='Date1',axis=1)
current_date = datetime.now(timezone.utc)
if values.iloc[-1, 0].strftime('%Y-%m-%d') >= current_date.date().strftime('%Y-%m-%d'):
values.drop(values.tail(1).index,inplace=True)

values = values.fillna(method="ffill",axis=0)
values = values.fillna(method="bfill",axis=0)
cols=values.columns.drop('Date')
values[cols] = values[cols].apply(pd.to_numeric,errors='coerce').round(decimals=1)
return values
#prepare input data for the prediction model
def prepare_predict_data(raw_data):
raw_data['Gold/20SMA'] = raw_data[names[0]].rolling(window=20).mean()
raw_data['Gold/50SMA'] = raw_data[names[0]].rolling(window=50).mean()
raw_data['Gold/200SMA'] = raw_data[names[0]].rolling(window=200).mean()
raw_data['USDX/20SMA'] = raw_data[names[1]].rolling(window=20).mean()
raw_data['USDX/50SMA'] = raw_data[names[1]].rolling(window=50).mean()
raw_data['USDX/200SMA'] = raw_data[names[1]].rolling(window=200).mean()
return raw_data
def lambda_handler(event, context):
# TODO implement
#current_date = datetime.now(timezone.utc) + timedelta(days=1)
current_date = datetime.now(timezone.utc)
days_200_before = current_date - timedelta(days=300)
raw_data = get_yf_data(days_200_before.date().strftime('%Y-%m-%d'), current_date.date().strftime('%Y-%m-%d'))

predict_x = prepare_predict_data(raw_data)

# get only the current date figure
new_dydb_record = predict_x.tail(1)
predict_x = predict_x.tail(1)
predict_x = predict_x.drop(['Date', 'Gold', 'USDX'], axis=1)

predictor = SKLearnPredictor(endpoint_name)

predict_x_array = predict_x.to_numpy()
label_predict = predictor.predict(predict_x_array)

new_dydb_record['nextday'] = float(label_predict[0])

vDate = new_dydb_record['Date'].iloc[0]
strDate = vDate.strftime('%Y-%m-%d')
numGold = float(new_dydb_record['Gold'].iloc[0])
numUSDX = float(new_dydb_record['USDX'].iloc[0])
numG20SMA = float(new_dydb_record['Gold/20SMA'].iloc[0])
numG50SMA = float(new_dydb_record['Gold/50SMA'].iloc[0])
numG200SMA = float(new_dydb_record['Gold/200SMA'].iloc[0])
numU20SMA = float(new_dydb_record['USDX/20SMA'].iloc[0])
numU50SMA = float(new_dydb_record['USDX/50SMA'].iloc[0])
numU200SMA = float(new_dydb_record['USDX/200SMA'].iloc[0])
numnextday = float(new_dydb_record['nextday'].iloc[0])

print('new record:')
print('strDate' + strDate)
print('numGold' + str(numGold))
print('numUSDX' + str(numUSDX))
print('numG20SMA' + str(numG20SMA))
print('numG50SMA' + str(numG50SMA))
print('numG200SMA' + str(numG200SMA))
print('numU20SMA' + str(numU20SMA))
print('numU50SMA' + str(numU50SMA))
print('numU200SMA' + str(numU200SMA))
print('numnextday' + str(numnextday))

try:
response = dydb.put_item(
TableName = table_name,
Item={
'date' : {'S':str(strDate)},
'gold' : {'N':str(numGold)},
'usdx' : {'N':str(numUSDX)},
'gold/20sma' : {'N':str(numG20SMA)},
'gold/50sma' : {'N':str(numG50SMA)},
'gold/200sma' : {'N':str(numG200SMA)},
'usdx/20sma' : {'N':str(numU20SMA)},
'usdx/50sma' : {'N':str(numU50SMA)},
'usdx/200sma' : {'N':str(numU200SMA)},
'nextdaygold' : {'N':str(numnextday)},
}
)
except Exception as e:
print(e)

# Deleting Endpoint
predictor.delete_endpoint(delete_endpoint_config=True)

Amazon EventBridge is a serverless event bus that makes it easy to connect applications together using data from your own applications. EventBridge delivers a stream of real-time data from event sources, such as AWS SageMaker in this case, and routes that data to targets like AWS Lambda.

After the AWS SageMaker endpoint has been created successfully by the first AWS Lambda function, an “SageMaker Endpoint State Change” event will be delivered and caught by the this AWS Lambda function by creating an EventBridge rule with the following event pattern.

{
"source": ["aws.sagemaker"],
"detail-type": ["SageMaker Endpoint State Change"],
"detail.EndpointName": ["ENDPOINT_NAME"],
"detail.EndpointStatus": ["IN_SERVICE"]
}

(Lambda 3) Export DynamoDB Data to CSV for Static Web Hosting: To simplify the frontend development, another AWS Lambda function is created to export new appended record in Amazon DynamoDB to csv file in AWS S3. First of all, enable DynamoDB Streams on your table and associate the stream Amazon Resource Name (ARN) with an AWS Lambda function that you write. As no external library is required, this AWS Lambda function is created directly in the AWS Console. It is much easier.

The DynamoDB stream will send out the new record through the event object, the AWS Lambda function below will get the record information and update the csv file stored in AWS S3 which is used for the web application.

import json
import io
import boto3
def lambda_handler(event, context):
bucket_name = 'S3_BUCKET_NAME'
file_name = 'CSV_FILE_NAME'
new_record = ''

for record in event.get('Records'):
if record.get('eventName') in ('INSERT'):
date = record['dynamodb']['NewImage']['date']['S']
gold = record['dynamodb']['NewImage']['gold']['N']
usdx = record['dynamodb']['NewImage']['usdx']['N']
gold20sma = record['dynamodb']['NewImage']['gold/20sma']['N']
gold50sma = record['dynamodb']['NewImage']['gold/50sma']['N']
gold200sma = record['dynamodb']['NewImage']['gold/200sma']['N']
usdx20sma = record['dynamodb']['NewImage']['usdx/20sma']['N']
usdx50sma = record['dynamodb']['NewImage']['usdx/50sma']['N']
usdx200sma = record['dynamodb']['NewImage']['usdx/200sma']['N']
nextdaygold = record['dynamodb']['NewImage']['nextdaygold']['N']
new_record = date + ',' + gold + ',' + usdx + ',' + gold20sma + ',' + gold50sma + ',' + gold200sma + ',' + usdx20sma+ ',' + usdx50sma + ',' + usdx200sma + ',' + nextdaygold + '\n'
print('add new record to S3: ')
print(new_record)

s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
with io.BytesIO() as data:
bucket.download_fileobj(file_name, data)
# add new record
data.write(new_record.encode('utf-8'))
data.seek(0)
# then write back to s3
bucket.upload_fileobj(data, file_name, ExtraArgs={'ACL':'public-read'})

Now everything is ready for the static web application development to visualize the performance of the machine learning model for gold price prediction. The current model may require further optimization but hope you found the way to build the application using AWS Serverless Platform useful and to help you get started with your own.