This is the hands-on Jupyter notebook of the article with the same name, published on the AWS AI Blog. The idea here is to show you how to create an end-to-end Image Classifcation solution, using Amazon Sagemaker. We will use here a technique called Transfer Learning, detailed in the blog post. We will pick a pre-trained Resnet152 (Imagenet11K) and specialize (re-train) it to classify 10 different pieces of clothing and accessories.
This hands on is comprised in four parts:
This notebooks is based on the original Amazon Sagemaker sample notebook: https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/imageclassification_caltech/Image-classification-transfer-learning.ipynb
For this experiment we will use a famous public dataset called Fashion MNIST. So, the outcome of this process is a trained model capable of classifying all the categories from this dataset.
Fashion-MNIST is a dataset of Zalando’s article, consisting of a training set of 60,000 examples and a test set of 10,000 examples. Each example is a 28×28 grayscale image, associated with a label from 10 classes.
https://research.zalando.com/welcome/mission/research-projects/fashion-mnist/
base_dir='/tmp/fashion'
resources_base_url='https://s3.amazonaws.com/aws-ml-blog/artifacts/image-classification/fashion-mnist'
test_images=resources_base_url + '/test_images.zip'
pre_trained_model=resources_base_url + '/model.tar.gz'
Please notice that it is in idx format. Given the Amazon Sagemaker built-in algorithm for Image Classification expects as input a dataset formated in RecordIO, we need to extract all the images from it and prepare the final RecordIO files (train & test).
!mkdir -p $base_dir/samples
!curl http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz | gunzip > $base_dir/samples/train-images-idx3-ubyte
!curl http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz | gunzip > $base_dir/samples/train-labels-idx1-ubyte
!curl http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz | gunzip > $base_dir/samples/t10k-images-idx3-ubyte
!curl http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz | gunzip > $base_dir/samples/t10k-labels-idx1-ubyte
!ls -lat $base_dir/samples/
All the images of a particular class will be stored in its respective directory. Since Fashin MNIST has 10 distinct classes, we'll have 10 different directories, one for each class.
DIRECTORY STRUCTURE
!mkdir -p $base_dir/fashion_mnist
import os
categories = ['TShirtTop', 'Trouser', 'Pullover','Dress','Coat','Sandal','Shirt','Sneaker','Bag','AnkleBoot' ]
for i in categories:
try:
os.mkdir(base_dir + '/fashion_mnist/%s' % i)
except OSError as e:
print(e)
!pip install python-mnist
from mnist import MNIST
from PIL import Image
import numpy as np
mndata = MNIST(base_dir + '/samples')
counter = 0
images, labels = mndata.load_training()
for i, img in enumerate(images):
img = np.reshape(img, (28, 28))
img = Image.fromarray(np.uint8(np.array(img)))
img = img.convert("RGB")
img.save(base_dir + '/fashion_mnist/%s/img_%d.jpg' % (categories[labels[i]], counter ))
counter += 1
images, labels = mndata.load_testing()
for i, img in enumerate(images):
img = np.reshape(img, (28, 28))
img = Image.fromarray(np.uint8(np.array(img)))
img = img.convert("RGB")
img.save(base_dir + '/fashion_mnist/%s/img_%d.jpg' % (categories[labels[i]], counter ))
counter += 1
!ls -lat $base_dir/fashion_mnist/
Right, now that we have all the images in their respective directories, one per class, it is time to create the RecordIO file. RecordIO is a optimized file format that will feed our images to the Neural Network during training.
We will split the dataset into training (70%) and testing (30%). To do that, we'll run a python script (im2rec), which is the best tool for this job.
# Here we will search for the python script im2rec
import sys,os
suffix='/mxnet/tools/im2rec.py'
im2rec = list(filter((lambda x: os.path.isfile(x + suffix )), sys.path))[0] + suffix
%env IM2REC=$im2rec
%env BASE_DIR=$base_dir
%%bash
# Ok. Here, im2rec will read all the images and create two .lst files, one for training and other for validation
# this files will then be used for creating the RecordIO files
cd $BASE_DIR
python $IM2REC --list --recursive --test-ratio=0.3 --train-ratio=0.7 fashion_mnist fashion_mnist/
ls *.lst
%%bash
cd $BASE_DIR
python $IM2REC --num-thread=4 --pass-through fashion_mnist_train.lst fashion_mnist
python $IM2REC --num-thread=4 --pass-through fashion_mnist_test.lst fashion_mnist
ls *.rec
import boto3
import sagemaker
# Get the current Sagemaker session
sagemaker_session = sagemaker.Session()
bucket=sagemaker_session.default_bucket()
train_path = sagemaker_session.upload_data(path=base_dir + '/fashion_mnist_train.rec', key_prefix='fashion-mnist/train')
test_path = sagemaker_session.upload_data(path=base_dir + '/fashion_mnist_test.rec', key_prefix='fashion-mnist/test')
Permissions and environment variables
Here we set up the linkage and authentication to AWS services. There are three parts to this:
%%time
import boto3
import re
import os
import time
from time import gmtime, strftime
from sagemaker import get_execution_role
# 1. Obtaining the role you already configured for Sagemaker when you setup
# your Instance notebook (https://docs.aws.amazon.com/sagemaker/latest/dg/gs-setup-working-env.html)
role = get_execution_role()
# 2. The S3 Bucket that will store the dataset and the trained model
# It was already defined above, while we uploaded the RecordIO files to the S3 bucket.
# 3. Select the correct Docker image with the Image Classification algorithm
containers = {'us-west-2': '433757028032.dkr.ecr.us-west-2.amazonaws.com/image-classification:latest',
'us-east-1': '811284229777.dkr.ecr.us-east-1.amazonaws.com/image-classification:latest',
'us-east-2': '825641698319.dkr.ecr.us-east-2.amazonaws.com/image-classification:latest',
'eu-west-1': '685385470294.dkr.ecr.eu-west-1.amazonaws.com/image-classification:latest'}
training_image = containers[boto3.Session().region_name]
print(training_image)
# The algorithm supports multiple network depth (number of layers). They are 18, 34, 50, 101, 152 and 200
# For this training, we will use 152 layers
num_layers = 152
# we need to specify the input image shape for the training data
image_shape = "3,28,28"
# we also need to specify the number of training samples in the training set
# for fashion_mnist it is 70012
num_training_samples = 70012
# specify the number of output classes
num_classes = 10
# batch size for training
mini_batch_size = 1024
# number of epochs
epochs = 40
# learning rate
learning_rate = 0.00001
# Since we are using transfer learning, we set use_pretrained_model to 1 so that weights can be
# initialized with pre-trained weights
use_pretrained_model = 1
# Training algorithm/optimizer. Default is SGD
optimizer = 'sgd'
dataset_prefix='fashion-mnist'
# create unique job name
job_name_prefix = 'fashion-mnist'
timestamp = time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
job_name = job_name_prefix + timestamp
training_params = {}
# Here we set the reference for the Image Classification Docker image, stored on ECR (https://aws.amazon.com/pt/ecr/)
training_params["AlgorithmSpecification"] = {
"TrainingImage": training_image,
"TrainingInputMode": "File"
}
# The IAM role with all the permissions given to Sagemaker
training_params["RoleArn"] = role
# Here Sagemaker will store the final trained model
training_params["OutputDataConfig"] = {
"S3OutputPath": 's3://{}/{}/output'.format(bucket, job_name_prefix)
}
# This is the config of the instance that will execute the training
training_params["ResourceConfig"] = {
"InstanceCount": 1,
"InstanceType": "ml.p2.xlarge",
"VolumeSizeInGB": 50
}
# The job name. You'll see this name in the Jobs section of the Sagemaker's console
training_params["TrainingJobName"] = job_name
# Here you will configure the hyperparameters used for training your model.
training_params["HyperParameters"] = {
"image_shape": image_shape,
"num_layers": str(num_layers),
"num_training_samples": str(num_training_samples),
"num_classes": str(num_classes),
"mini_batch_size": str(mini_batch_size),
"epochs": str(epochs),
"learning_rate": str(learning_rate),
"use_pretrained_model": str(use_pretrained_model),
"optimizer": optimizer
}
# Training timeout
training_params["StoppingCondition"] = {
"MaxRuntimeInSeconds": 360000
}
# The algorithm currently only supports fullyreplicated model (where data is copied onto each machine)
training_params["InputDataConfig"] = []
# Please notice that we're using application/x-recordio for both
# training and validation datasets, given our dataset is formated in RecordIO
# Here we set training dataset
# Training data should be inside a subdirectory called "train"
training_params["InputDataConfig"].append({
"ChannelName": "train",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": 's3://{}/{}/train/'.format(bucket, dataset_prefix),
"S3DataDistributionType": "FullyReplicated"
}
},
"ContentType": "application/x-recordio",
"CompressionType": "None"
})
# Here we set validation dataset
# Validation data should be inside a subdirectory called "validation"
training_params["InputDataConfig"].append({
"ChannelName": "validation",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": 's3://{}/{}/test/'.format(bucket, dataset_prefix),
"S3DataDistributionType": "FullyReplicated"
}
},
"ContentType": "application/x-recordio",
"CompressionType": "None"
})
print('Training job name: {}'.format(job_name))
print('\nInput Data Location: {}'.format(training_params['InputDataConfig'][0]['DataSource']['S3DataSource']))
You'll create your model in four steps:
# Get the Sagemaker client
sagemaker = boto3.client(service_name='sagemaker')
# create the Amazon SageMaker training job
sagemaker.create_training_job(**training_params)
# confirm that the training job has started
status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print('Training job current status: {}'.format(status))
try:
# wait for the job to finish and report the ending status
sagemaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_name)
training_info = sagemaker.describe_training_job(TrainingJobName=job_name)
status = training_info['TrainingJobStatus']
print("Training job ended with status: " + status)
except:
print('Training failed to start')
# if exception is raised, that means it has failed
message = sagemaker.describe_training_job(TrainingJobName=job_name)['FailureReason']
print('Training failed with the following error: {}'.format(message))
%%time
import boto3
from time import gmtime, strftime
use_pretrained_model=True
model_name="fashion-mnist" + time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
print(model_name)
if use_pretrained_model:
prefix="fashion-mnist/model/model.tar.gz"
model_data="s3://{}/{}".format(bucket, prefix)
s3 = boto3.client('s3')
resp = s3.list_objects(Bucket=bucket, Prefix=prefix)
if resp.get("Contents") is None:
print("Please wait. It will take around 6mins")
!curl -s $pre_trained_model | aws s3 cp - s3://$bucket/fashion-mnist/model/model.tar.gz
else:
info = sagemaker.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
print(model_data)
primary_container = {
'Image': training_image,
'ModelDataUrl': model_data,
}
try:
create_model_response = sagemaker.create_model(
ModelName = model_name,
ExecutionRoleArn = role,
PrimaryContainer = primary_container)
print(create_model_response['ModelArn'])
except Exception as e:
print(e)
At launch, we will support configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, customers create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some way.
In addition, the endpoint configuration describes the instance type required for model deployment, and at launch will describe the autoscaling configuration.
from time import gmtime, strftime
timestamp = time.strftime('%Y-%m-%d-%H-%M-%S', time.gmtime())
endpoint_config_name = job_name_prefix + '-epc-' + timestamp
endpoint_config_response = sagemaker.create_endpoint_config(
EndpointConfigName = endpoint_config_name,
ProductionVariants=[{
'InstanceType':'ml.c4.2xlarge',
'InitialInstanceCount':1,
'ModelName':model_name,
'VariantName':'AllTraffic'}])
print('Endpoint configuration name: {}'.format(endpoint_config_name))
print('Endpoint configuration arn: {}'.format(endpoint_config_response['EndpointConfigArn']))
Lastly, you will create the endpoint that serves up the model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications. This takes 9-11 minutes to complete.
%%time
import time
timestamp = time.strftime('%Y-%m-%d-%H-%M-%S', time.gmtime())
endpoint_name = job_name_prefix + '-ep-' + timestamp
print('Endpoint name: {}'.format(endpoint_name))
endpoint_params = {
'EndpointName': endpoint_name,
'EndpointConfigName': endpoint_config_name,
}
endpoint_response = sagemaker.create_endpoint(**endpoint_params)
print('EndpointArn = {}'.format(endpoint_response['EndpointArn']))
# get the status of the endpoint
response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = response['EndpointStatus']
print('EndpointStatus = {}'.format(status))
# wait until the status has changed
sagemaker.get_waiter('endpoint_in_service').wait(EndpointName=endpoint_name)
# print the status of the endpoint
endpoint_response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
status = endpoint_response['EndpointStatus']
print('Endpoint creation ended with EndpointStatus = {}'.format(status))
if status != 'InService':
raise Exception('Endpoint creation failed.')
Let's recapitulate. We've just trained an Image Classifier model using Fashion MNIST. What does that mean? It means that now, we have an 'WebService' accessible by the endpoint we just deployed, that has the power to classify 10 different types of objects: 1) Ankle Boot; 2) Bag; 3) Coat; 4) Dress; 5) Pullover; 6) Sandal; 7) Shirt; 8) Sneaker; 9) TShirt 10) Trouser.
With that in mind let's test our model in some examples and see what happens. If you decided to use the pre-trained model in step 2/4, you'll see that the model is capable of making good predictions for the test images.
In test_data directory you will find 5 images of 5 real itens as you can see bellow.
!curl -s $test_images -o /tmp/test_images.zip
!mkdir $base_dir/test_data
!unzip /tmp/test_images.zip -d $base_dir/test_data
!rm -f /tmp/test_images.zip
%matplotlib inline
import matplotlib.pyplot as plt
from PIL import Image
test_categories = ['Shirt','TShirtTop', 'AnkleBoot', 'Sneaker', 'Bag']
f, axarr = plt.subplots(1, 5, figsize=(20,12))
col = 0
for i in range(5):
im = Image.open(base_dir + '/test_data/item%d_thumb.jpg' % (i+1))
axarr[col].text(0, 0, '%s' %(test_categories[i] ), fontsize=15, color='blue')
frame = axarr[col].imshow(im)
col += 1
plt.show()
import json
import numpy as np
from io import BytesIO
runtime = boto3.Session().client(service_name='sagemaker-runtime')
object_categories = ['AnkleBoot','Bag','Coat','Dress','Pullover','Sandal','Shirt','Sneaker','TShirtTop','Trouser']
_, axarr = plt.subplots(1, 5, figsize=(20,12))
col = 0
for i in range(5):
# Load the image bytes
img = open(base_dir + '/test_data/item%d_thumb.jpg' % (i+1), 'rb').read()
# Call your model for predicting which object appears in this image.
response = runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType='application/x-image',
Body=bytearray(img)
)
# read the prediction result and parse the json
result = response['Body'].read()
result = json.loads(result)
# which category has the highest confidence?
pred_label_id = np.argmax(result)
# Green when our model predicted correctly, otherwise, Red
text_color = 'red'
if object_categories[pred_label_id] == test_categories[i]:
text_color = 'green'
# Render the text for each image/prediction
output_text = '%s (%f)' %(object_categories[pred_label_id], result[pred_label_id] )
axarr[col].text(0, 0, output_text, fontsize=15, color=text_color)
print( output_text )
# Render the image
img = Image.open(BytesIO(img))
frame = axarr[col].imshow(img)
col += 1
plt.show()
When we're done with the endpoint, we can just delete it and the backing instances will be released. Run the following cell to delete the endpoint.
sagemaker.delete_endpoint(EndpointName=endpoint_name)
!rm -rf $base_dir