While studying for the Azure Data Scientist Associate certification, I took notes from Building AI Solution with Azure ML course. In this single page, you'll find the entire content of the course (as of 18th August, 2020). This page is a small support for those preparing for earning the certification.
Intro
Azure ML Workspace
workspaces are azure resources. include:
- compute
- notebooks
- pipelines
- data
- experiments
- models
created alongside
- storage account: files by WS + data
- application insights
- key vault
- vm
- container registry
permission: RBAC
edition - basic (no graphic designer) - enterprise
Tools
Azure ML Studio - designer (no code ML model dev) - automated ML
Azure ML SDK
Azure ML CLI Extensions
Compute Instances - choose VM - store notebooks independently of VMs
VS Code - Azure ML Extension
Experiments
Azure ML tracks run of experiments
...
run = experiment.start_logging()
...
run.complete()
- logging metrics.
run.log('name', value)
. You can review them viaRunDetails(run).show()
- experiment output file. Example: trained models.
run.upload_file(..)
.
Script as an experiment. In the script, you can get the context: run = Rune.get_context()
. To run it, you define:
- RunConfiguration: python environment
- ScriptRunConfig: associates RunConfiguration with script
Train a ML model
Estimators
Estimator: encapsulates a run configuration and a script configuration in a single object. Save trained model as pickle in outputs
folder
estimator = Estimator(
source_directory='experiment',
entry_script='training.py',
compute_target='local',
conda_packages=['scikit-learn']
)
experiment = Experiment(workspace, name='train_experiment')
run = experiment.submit(config=estimator)
Framework-specific estimators simplify configurations
from azureml.train.sklearn import SKLearn
estimator = SKLearn(
source_directory='experiment',
entry_script='training.py',
compute_target='local'
)
Script parameters
Use argparse
to read the parameters in a script (eg regularization rate). To pass a parameter to an Estimator
:
estimator = SKLearn(
source_directory='experiment',
entry_script='training.py',
script_params={'--reg_rate': 0.1}
compute_target='local'
)
Registering models
Once the experiment Run
has completed, you can retrieve its outputs (eg trained model).
run.download_file(name='outputs/models.pkl', output_file_path='model.pkl')
Registering a model allows to track multiple versions of a model.
model = Model.register(
workspace=ws,
model_name='classification_model',
model_path='model.pkl', #local path
description='a classification model',
tags={'dept': 'sales'},
model_framework=Model.Framework.SCIKITLEARN,
model_framework_version='0.20.3'
)
or register from run:
run.register_model(
...
model_path='outputs/model.pkl'
...
)
Datastores
Abstractions of cloud data sources encapsulating the information required to connect.
You can register a data store
- via ML Studio
- via SDK
ws = Workspace.from_config()
blob = Datastore.register_azure_blob_container(
workspace=ws,
datastore_name='blob_data',
container_name='data_container',
account_name='az_acct',
account_key='123456'
)
In the SDK, you can list data stores.
Use datastores
Most common: Azure blob and file
blob_ds.upload(
src_dir='/files',
target_path='/data/files',
overwrite=True
)
blob_ds.download(
target_path='downloads',
prefix='/data'
)
You pass a data reference to the script to use a datastore. Data access models
- download: contents downloaded to the compute context of experiment
- upload: files generated by experiment are uploaded after run
- mount: path of datastore mounted as remote storage (only on remote compute target)
Pass reference as script parameter:
data_ref = blob_ds.path('data/files').as_download(path_on_compute='training_data')
estimator = SKLearn(
source_directory='experiment_folder',
entry_script='training_script.py',
compute_target='local',
script_params={'--data_folder': data_ref}
)
Retrieve it in script and use it like local folder:
parser = argparse.ArgumentParser()
parser.add_argument('--data_folder', type='str', dest='data_folder')
args = parser.parse_args()
data_files = os.listdir(args.data_folder)
Datasets
Datasets are versioned packaged data objects consumed in experiments and pipelines. Types
- tabular: read as table
- file: list of file paths
You can create dataset via Azure ML Studio or via SDK. File paths can have wildcards (/files/*.csv
).
Once a dataset is created, you can register it in the workspace (available later too).
Tabular:
from azureml.core import Dataset
blob_ds = we.get_default_datastore()
csv_paths = [
(blob_ds, 'data/files/current_data.csv'),
(blob_ds, 'data/files/archive/*.csv')
]
tab_ds = Dataset.Tabular.from_delimited_files(path=csv_paths)
tab_ds = tab_ds.register(workspace, name='csv_table')
File:
blob_ds = ws.get_default_datastore()
file_ds = Dataset.File.from_files(path=(blob_ds, 'data/files/images/*.jpg'))
file_ds = file_ds.register(workspace=ws, name='img_files')
Retrieve a dataset
ws = Workspace.from_config()
# Get a dataset from workspace datasets collection
ds1 = ws.datasets['csv_table']
# Get a dataset by name from the datasets class
ds2 = Dataset.get_by_name(ws, 'img_files')
Datasets can be versioned. Create a new versioning by registering with same name and create_new_version
property:
file_ds = file_ds.register(workspace=ws, name='img_files', create_new_version=True)
Retrieve specific version:
img_ds = Dataset.get_by_name(workspace=ws, name='img_files', version=2)
Compute Contexts
The runtime context for each experiment consists of
- environment for the script, which includes all packages
- compute target on which the environment will be deployed
Intro to Environments
Python runs in virtual environments (eg Conda
, pip
). Azure creates a Docker container and creates the environment. You create environments by
Conda
orpip
yaml file and load it:
env = Environment.from_conda_specification(name='training_env', file_path='./conda.yml')
- from existing
Conda
environment:
env = Environment.from_conda_environment(name='training_env',
conda_environment_name='py_env')
- specifying packages:
env = Environment('training_env')
deps = CondaDependencies.create(conda_packages=['pandas', 'numpy']
pip_packages=['azureml-defaults'])
env.python.conda_dependencies = deps
Once created, you can register the environment in the workspace.
env.register(workspace=ws)
Retrieve and assign it to a ScriptRunConfig
or an Estimator
tr_env = Environment.get(workspace=ws, name='training_env')
estimator = Estimator(
source_directory='experiment_folder',
entry_script='training_script.py',
compute_target='local',
environment_definition=tr_env
)
Compute targets
Compute targets are physical or virtual computer on which experiments are run. Types of compute
- local compute: your workstation or a virtual machine
- compute clusters: multi-node clusters of VMs that automatically scale up or down
- inference clusters: to deploy models, they use containers to initiate computing
- attached compute: attach a VM or Databricks cluster that you already use
You can create a compute target via AML studio or via SDK. A managed compute target is one managed by AML. Via SDK
ws = Workspace.from_config()
compute_name = 'aml-cluster'
compute_config = AmlCompute.provisioning_configuration(
vm_size='STANDARD_DS12_V2',
min_nodes=0,
max_nodes=4,
vm_priority='dedicated'
)
aml_cluster = ComputeTarget.create(we, compute_name, compute_config)
aml_cluster.wait_for_completion()
An unmanaged compute target is defined and managed outside AML. You can attach it via SDK:
ws = Workspace.from_config()
compute_name = 'db-cluster'
db_workspace_name = 'db_workspace'
db_resource_group = 'db_resource_group'
db_access_token = 'aocsinaocnasoivn'
db_config = DatabricksCompute.attach_configuration(
resource_group=db_resource_group,
workspace_name=db_workspace_name,
access_token=db_access_token
)
db_cluster = ComputeTarget.create(we, compute_name, db_config)
db_cluster.wait_for_completion()
You can check if a compute target does not exist already:
compute_name = 'aml_cluster'
try:
aml_cluster = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
# create it
...
You can use a compute target in an experiment run by specifying it as a parameter
compute_name = 'aml_cluster'
training_env = Environment.get(workspace=ws, name='training_env')
estimator = Estimator(
source_directory='experiment_folder',
entry_script='training_script.py',
environment_definition=training_env,
compute_target=compute_name
)
# or specify a ComputeTarget object
training_cluster = ComputeTarget(workspace=ws, name=compute_name)
estimator = Estimator(
source_directory='experiment_folder',
entry_script='training_script.py',
environment_definition=training_env,
compute_target=training_cluster
)
Orchestrating with Pipelines
A pipeline is a workflow of ml tasks in which each tasks is implemented as a step (either sequential or parallel). You can combine different compute targets. Common types of step:
- PythonScriptStep
- EstimatorStep: runs an estimator
- DataTransferStep: uses ADF
- DatabricksStep
- AdlaStep: runs a
U-SQL
job in Azure Data Lake Analytics
Define steps:
step1 = PythonScriptStep(
name='prepare data',
source_directory='scripts',
script_name='data_prep.py',
compute_target='aml-cluster',
runconfig=run_config
)
step2 = EstimatorStep(
name='train model',
estimator=sk_estimator,
compute_target='aml-cluster'
)
Assign steps to pipeline:
train_pipeline = Pipeline(
workspace=ws,
steps=[step1,step2]
)
# create experiment and run pipeline
experiment = Experiment(workspace=ws, name='training-pipeline')
pipeline_run = experiment.submit(train_pipeline)
Pass data between steps
The PipelineData
object is a special kind of DataReference
that
- reference a location in a store
- creates a da dependency between pipelines
To pass it
- define a
PipelineData
object that references a location in a data store - specify the object as input or output for the steps that use it
- pass the
PipelineData
object as a script parameter in steps that run scripts
Example
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')
# Define object to pass data between steps
data_store = ws.get_default_datastore()
prepped_data = PipelineData('prepped', datastore=data_store)
step1 = PythonScriptStep(
name='prepare data',
source_directory='scripts',
script_name='data_prep.py',
compute_target='aml-cluster',
runconfig=run_config,
# specify dataset
inputs = [raw_ds.as_named_input('raw_data')],
# specify PipelineData as output
outputs = [prepped_data],
# script reference
arugments = ['--folder', prepped_data]
)
step2 = EstimatorStep(
name='train model',
estimator=sk_estimator,
compute_target='aml-cluster'
# specify PipelineData
inputs = [prepped_data],
# pass reference to estimator script
estimator_entry_script_arguments = ['--folder', prepped_data]
)
Inside the script, you can get reference to PipelineData
object from the argument, and use it like a local folder.
parser = argpare.ArgumentParser()
parser.add_argument('--folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder
# ...
# save data to PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
df.to_csv(output_path)
Reuse steps
By default, the step output from a previous pipeline run is reused without rerunning the step (if script, source directory and other params have not changed). You can control this:
step1 = PythonScriptStep(
#...
allow_reuse=False
)
You can force the steps to run regardless of individual configuration:
pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)
Publish pipelines
You can publish a pipelien to create a REST endpoint through which the pipeline can be run on demand.
published_pipeline = pipeline.publish(
name='training_pipeline',
description='Model training pipeline',
version='1.0'
)
You can view it in ML Studio and get the endpoint:
published_pipeline.endpoint
You start a published endpoint by making an HTTP request to it. You pass the authorisation header (with token) and a JSON payload specifying the experiment name. The pipeline is run asynchronously, you get the run ID as response.
Pipeline parameters
Create a PipelineParameter
object for each parameter. Example:
reg_param = PipelineParameter(name='reg_rate', default_value=0.01)
# ...
step2 = EstimatorStep(
# ...
estimator_entry_script_arguments=[
'--folder', prepped,
'--reg', reg_param
]
)
After you publish a parametrised pipeline, you can pass parameter values in the JSON payload of the REST interface. Example
requests.post(
enpoint,
headers=auth_header,
json={
'ExperimentName': 'run_training_pipeline',
'ParameterAssignments': {
'reg_rate': 0.1
}
}
)
Schedule pipelines
Define a ScheduleRecurrence
and use it to create a Schedule
.
daily = ScheduleRecurrence(
frequency='Day',
interval=1
)
pipeline_schedule = Schedule.create(
ws,
name='Daily Training',
description='train model every day',
pipeline_id=published_pipeline.id,
experiment_name='Training_Pipeline',
recurrence=daily
)
To schedule a pipeline to run whenever data changes, you must create a Schedule
that monitors a specific path on a datastore:
training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(
# ...
datastore=training_datastore,
path_on_datastore='data/training'
)
Deploy ML Models
You can deploy ass container to several compute targets
- Azure ML compute instance
- Azure container instance
- Azure function
- Azure Kubernetes service
- IoT module
Steps
- register the model
- inference configuration
- deployment configuration
- deploy model
Register the model
After training, you must register the model to Azure ML workspace.
classification_model = Model.register(
workspace=ws,
model_name='classification_model',
model_path='model.pkl',
description='A classification model'
)
Or you can use the reference to the run:
run.register_model(
model_name='classification_model',
model_path='outputs/model.pkl',
description='A classification model'
)
Inference configuration
The model will be deployed as a service consisting of
- a script to load the model and return predictions for submitted data
- an environment in which the script will be run
Create the entry script (or scoring script) as a Python file including 2 functions
init()
called when service is initialised (load model from registry)run(raw_data)
called when new data is submitted to the service (generate predictions)
Example
def init():
global model
model_path = Model.get_model_path('classification_model')
model = joblib.load(model_path)
def run(raw_data):
data = np.array(json.loads(raw_data)['data'])
predictions = model.predict(data)
# return predictions as any JSON seriazable format
return predictions.tolist()
You can configure the environment using Conda. You can use a CondaDependencies
class to create a default environment (including azureml-defaults
and other commonly-used) and add any other required packages. You then serialize the environment to a string and save it.
myenv = CondaDependencies()
myenv.add_conda_package('scikit-learn')
env_file = 'service_files/env.yml'
with open(env_file, 'w') as f:
f.write(myenv.serialize_to_string())
After creating the script and the environment, you combine them in an InferenceConfig
:
classifier_inference_config = InferenceConfig(
runtime='python',
source_directory='service_files',
entry_script='score.py',
conda_file='env.yml'
)
Deployment configuration
Now that you have the entry script and the environment, you configure the compute service. If you deploy to an AKS cluster, you create it
cluster_name = 'aks-cluster'
compute_config = AksCompute.provisioning_configuration(location='eastus')
production_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
production_cluster.wait_for_completion()
You define the deployment configuration
classifier_deploy_config = AksWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=1
)
Deploy the model
model = ws.models['classification_model']
service = Model.deploy(
name='classification-service',
models=[model],
inference_config=classifier_inference_config,
deploy_config=classifier_deploy_config,
deployment_target=production_cluster
)
service.wait_for_deployment()
Consuming a real-time inferencing service
For testing, you can use the AML SDK to call a web service through the run
method of a WebService
object. Typically, you send data to run
method in a JSON like
{
'data':[
[0.1, 0.2, 3.4],
[0.9, 8.2, 2.5],
...
]
}
The response is a JSON with a prediction for each case
response = service.run(input_data=json_data)
predictions = json.loads(response)
In production, you use a REST endpoint. You find the endpoint of a deployed service in Azure ML studio, or by retrieving the scoring_url
property of a Webservice
object:
endpoint = service.scoring_uri
There are 2 kinds of authentication:
- key: requests are authenticated by specifying the key associated with the service
- token: requests are authenticated by providing a JSON Web Token (JWT)
By default, authentication is disabled for Azure Container Instance service (set to key-based authentication for AKS).
To make an authenticate call to the REST endpoint, you include the oey or the token in the request header.
Troubleshooting service deployment
You can
- check the service state (should be healty):
service.state
- review service logs:
service.get_logs()
- deploy to local container
Batch inference pipelines
Pipeline to read input data, load a registered model, predict labels, and write results.
- Register a model
- Create a scoring script. The
run(mini_batch)
method makes the inference on each batch. - Create a pipeline with ParallelRunStep
- Run the pipeline and retrieve the step output
Azure ML provides a pipeline step performs parallel batch inference. Using ParallelRunStep
class, you can read batches of files from a File
dataset and write the output to a PipelineData
reference. You can set the output_action
to "append_row" (ensuring all instances of the step will collate the result to a single output file named parallel_run_step.txt
).
batch_data_set = ws.datasets('batch-data')
# output location
default_ds = we.get_default_datastore()
output_dir = PipelineData(
name='inferences',
datastore=default_ds,
output_path_on_compute='results'
)
parallel_run_config = ParallelRunConfig(
source_directory='batch_scripts',
entry_script='batch_scoring_script.py',
mini_batch_size="5",
error_threshold=10,
output_action="append_row",
environment=batch_env,
compute_target=aml_cluster,
node_count=4
)
parallelrun_step = ParallelRunStep(
name="batch-score",
parallel_run_config=parallel_run_config,
inputs=[batch_data_set.as_named_input('batch_data')],
output=output_dir,
arguments=[],
allow_reuse=True
)
pipeline = Pipeline(
workspace=ws,
steps=[parallelrun_step]
)
Run the pipeline and retrieve output.
pipeline_run = Experiment(ws, 'batch_prediction_pipeline').submit(pipeline)
pipeline_run.wait_for_completion()
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='results')
Publishing a batch inference pipeline
You can publish it as a REST service.
published_pipeline = pipeline_run.publish_pipeline(
name='Batch_Prediction_Pipeline',
description='Batch Pipeline',
version='1.0'
)
rest_endpoint = published_pipeline.endpoint
Once published, you can use the endpoint to initiate a batch inferencing job.
You can also schedule the published pipeline to have it run automatically.
weekly = ScheduleRecurrence(frequency='Week', interval=1)
pipeline_schedule = Schedule.create(
ws,
name='Weekly Predictions',
description='batch inferencing',
pipeline_id=published_pipeline.id,
experiment_name='Batch_Prediction',
recurrence=weekly
)
Tuning hyperparameters
Accomplished by training multiple models, using same algorithm and training data but different hyperparameter values. Then, evaluate for each the performance metric (eg accuracy), and the best-performing model is selected.
In Azure ML, you make an experiment that consist of a hyperdrive run, which initiates a child run for each hyperparameter. Each child run uses a training script with parametrised hyperparameter values to train a model, and logs the target performance metric achieved by the training model.
Define a search space
Depends on the type of hyperparameter:
- discrete. Make a
choice
out of - an explicit python
list
:choice([10, 20, 30])
- a
range
:choice(range(1,10))
- select values from a discrete distribution: qnormal, quniform, qlognormal, qloguniform
- continuous. Use any of these distribution: normal, uniform, lognormal, loguniform
Define a search space by creating a dictionary with parameter expressions for each hyperparameter.
from azureml.train.hyperdrive import choice, normal
param_space = {
'--batch_size': choice(16, 32, 64),
'--learning_rate': normal(10, 3)
}
Configuring sampling
The values used in a tuning run depend on the type of sampling used.
Grid sampling. Every possible combination when hyperparameters are discrete.
param_space = {
'--batch_size': choice(16, 32, 64),
'--learning_rate': choice(10, 20)
}
param_sampling = GridParameterSampling(param_space)
Random sampling. Randomly select a value for each hyperparameter.
param_space = {
'--batch_size': choice(16, 32, 64),
'--learning_rate': normal(10, 3)
}
param_sampling = RandomParameterSampling(param_space)
Bayesian sampling. Based on Bayesian optimisation algorithm that tries to select parameter combinations that will result in improved performance from the previous selection.
param_space = {
'--batch_size': choice(16, 32, 64),
'--learning_rate': uniform(0.5, 0.1)
}
param_sampling = BayesianParameterSampling(param_space)
Can only be used with choice, uniform, quniform distributions and can't be combined with early termination.
Configuring an early termination
Typically, you set a maximum number of iterations, but this could still result in a large number of runs that don't result in a better model than a combination that has already been tried.
To help preventing wasting time, you can set an early termination policy that abandons runs that are unlikely to produce a better result than previously completed runs. The policy is evaluated at an evaluation interval you specify, based on each time the target performance metric is logged. You can also set a delay evaluation parameter to avoid evaluating the policy until a minimum number of iterations have been completed.
Note. Early termination is particularly useful for deep learning scenarios where a deep neural network is trained iteratively over a number of epochs. The training script can report the target metric after each epoch, and if the run is significantly underperforming previous runs after the same number of intervals, it can be abandoned.
Bandit policy. Stop a run if the target performance metric underperforms the best run so far by a specified margin.
early_termination_policy = BanditPolicy(
slack_amount=0.2, # abandon runs when metric is 0.2 or more worse than best run after the same number of intervals
evaluation_interval=1,
delay_evaluation=5
)
You can also use a slack factor comparing the metric as ration rather than an absolute value.
Median stopping policy. Abandoning runs where the target performance metric is worse than the median of the running averages fo all runs.
early_termination_policy = MedianStoppingPolicy(
evaluation_interval=1,
delay_evaluation=5
)
Truncation selection policy. Cancelling the lower performing X%% of runs at each evaluation interval based on the truncation_percentage valu you specify for X.
early_termination_policy = TruncationSelectionPolicy(
truncation_percentage=10,
evaluation_interval=1,
delay_evaluation=5
)
Running a hyperparameter tuning experiment
In Azure ML, you tune hyper by running a hyperdrive experiment. You need to create a training script just the way you would do for any other training experiment, except that you must:
- include an argument for each hyperparameter
- log the target performance metric.
This example script trains a logistic regression using a --regularization
argument (regularization rate), and logs the accuracy.
parser = argparse.ArgumentParser()
parser.add_argument('--regularization', type=float, dest='reg_rate', default=0.01)
args = parser.parse_args()
reg = args.reg_rate
# get experiment run context
run = Run.get_context()
data = run.input_datasets['training_data'].to_pandas_dataframe()
X = data[['feature1', 'feature2', 'feature3', 'feature4']].values
y = data['label'].values
X_train, X_test, y_train, y_test = train_test_split(X, y test_size=0.3)
model = LogisticRegression(C=1/reg, solver='liblinear').fit(X_train, y_train)
# calculate and log accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
run.log('Accuracy', np.float(acc))
# save trained model
os.makedirs('outputs', exist_ok=True)
joblib.dump(value=model, filename='outputs/model.pkl')
run.complete()
To prepare the hyperdrive experiment, you use a HyperDriveConfig
object to configure the experiment run.
hyperdrive = HyperDriveConfig(
estimator=sklearn_estimator,
hyperparameter_sampling=param_sampling,
policy=None,
primary_metric_name='Accuracy',
primary_metricgoal=PrimaryMetricGoal.MAXIMIZE,
max_total_runs=6,
max_concurrent_runs=4
)
experiment = Experiment(workspace=ws, name='hyperdrive_training')
hyperdrive_run = experiment.submit(config=hyperdrive)
You can monitor hyperdrive experiment in Azure ML studio. The experiment will initiate a child run for each hyperparameter combination to be tried
Automate model selection
Visual interface for automated ML in Azure ML Studio for Enterprise edition only.
You can use automated ML to train models for the tasks below. Azure ML supports common algorithms for these tasks:
- classification
- logistic regression
- light gradient boosting machine
- decision tree
- random forest
- naive Bayes
- linear SVM
- XGBoost
- DNN classifier
- others...
- regression
- linear regression
- light gradient boosting machine
- decision tree
- random forest
- elastic net
- LARS Lasso
- XGBoost
- Others
- time series forecasting
- linear regression
- light gradient boosting machine
- decision tree
- random forest
- elastic net
- LARS Lasso
- XGBoost
- others
By default, automated machine learning, will randomly select from the full range of algorithms for the specified task. You can choose to block individual algorithms from being selected.
Preprocessing and featurization
Automated ML (AutoML) can apply preprocessing transformations to your data.
- scaling and normalization applied to numeric data automatically
- optional featurization
- missing value imputation
- categorical encoding
- dropping high cardinality features (eg IDs)
- feature engineering (eg date parts from DateTime)
Running AutoML experiment
You can use Auzure ML Studio UI or use SDK (using AutoMLConfig
class).
automl_run_config = RunConfiguration(framework='python')
automl_config = AutoMLConfig(
name='auto ml experiment',
task='classification',
primary_metric='AUC_weighted',
compute_target=aml_compute,
training_data=train_dataset,
validation_data=test_dataset,
label_column_name='label',
featurization='auto',
iterations=12,
max_concurrent_iterations=4
)
With Azure ML Studio, you can create or select an Azure ML dataset to be used as input for your AutoML experiment. When using the SDK, you can submit data by
- specify a dataset or dataframe of training data that includes features and label to be predicted
- optionally, specify a second validation data dataset or dataframe. If this is not provided, Azure ML will apply cross-validation.
Alternatively:
- specify a dataset, dataframe, or numpy array of X values containing features with a corresponding y array of label values
One of the most important setting you specify is primary_metric (ie target performance metric). Azure ML supports a set of named metrics for each type of task.
get_primary_metrics('classification')
You can submit an AutoML experiment like any other SDK-based experiment:
automl_experiment = Experiment(ws, 'automl_experiment')
automl_run = automl_experiment.submit(automl_config)
You can easily identify the best run in Auzre ML studio, and download or deploy the model it generated. Via SDK:
best_run, fitted_model = automl_run.get_output()
best_run_metrics = best_run.get_metrics()
for metric_name in best_run_metrics:
metric = best_run_metrics[metric_name]
print(metric_name, metric)
AutoML uses scikit-learn pipelines. You can view the steps in the fitted model you obtained from the best run.
for step in fitted_model.named_steps:
print(step)
Explain ML models
Model explainers use statistical techniques to calculate feature importance. Explainers work by evaluating a test data set of feature cases and the labels the model predicts for them.
Global feature importance quantifies the relative importance of each feature in the test dataset as a whole: which feature in the dataset influences prediction?
Local feature importance measures the influence of each feature value for a specific individual prediction. Example, will Sam go deafult?
Prediction=0: Samuel won't default on the loan repayment
Features:
- loan amount; support for 0:
0.9
; support for 1:-0.9
- income; support for 0:
0.6
- age; support for 0:
-0.2
- marital status; support for 0:
0.1
Because this is a classification model, each feature gets a local importance value for each possible class, indicating the amount of support for that class based on the feature value.
The most important feature for a prediction of class 1 is loan amount. There could be multiple reasons why local importance for an individualprediction varies form global importance for the overall dataset. For example, Sam might have a lower income than average, but the loan amount in this case might be unusually small.
For a multi-class classification model, a local importance value for each possible class is calculated for every feature, with the total across all classes always being 0.
For a regression model, the local importance values simply indicate the level of influence each feature has on the predicted scalar label.
Using explainers
You can use Azure ML SDK to create explainers for models even if they were not trained using an Azure ML experiment.
You install the azureml-interpret
package. Types of explainer include:
MimicExplainer
creates a global surrogate model that approximates your trained model and can be used to generate explanations. This explainable model must have the same kind of architecture as your trained model (eg linear or tree-based)TabularExplainer
acts as a wrapper around various SHAP explainer algorithms, automatically choosing the one that is most appropriate for your model architecturePFIExplainer
(Permutation Feature Importance) analyzes feature importance by shuffling feature values and measuring the impact on prediction performance
Example for hypothetical model named loan_model
mim_explainer = MimicExplainer(
model=loan_model,
initialization_examples=X_test,
explainable_model=DecisionTreeExplainableModel,
features=['loan_amount', 'income', 'age', 'marital_status'],
classes=['reject', 'approve']
)
tab_explainer = TabularExplainer(
model=loan_model,
initialization_examples=X_test,
features=['loan_amount', 'income', 'age', 'marital_status'],
classes=['reject', 'approve']
)
pfi_explainer = PFIExplainer(
model=loan_model,
features=['loan_amount', 'income', 'age', 'marital_status'],
classes=['reject', 'approve']
)
To retrieve global feature importance, call the explain_global()
method of your explainer, and then use the get_feature_importance_dict()
method to get a dictionary of the feature importance values.
global_mim_explanation = mim_explainer.explain_global(X_train)
global_mim_feature_importance = global_mim_explanation.get_feature_importance_dict()
# same as MimixExplainer
global_tab_explanation = mim_explainer.explain_global(X_train)
global_tab_feature_importance = global_tab_explanation.get_feature_importance_dict()
# requires actual labels
global_pfi_explanation = mim_explainer.explain_global(X_train)
global_pfi_feature_importance = global_pfi_explanation.get_feature_importance_dict()
To retriev local feature importance from a MimicExplainer
or a TabularExplainer
, you must call the explain_local()
specifying the subset of cases you want to explain. Then you use the get_ranked_local_names()
and get_ranked_local_values()
to retrieve dictionares.
# same for tab_explainer too
local_mim_explanation = mim_explainer.explain(X_test[0:5])
local_mim_features = local_mim_explanation.get_ranked_local_names()
local_mim_importance = local_mim_explanation.get_ranked_local_values()
PFIExplainer
does not support local feature importance explanations.
Creating explanations
You can create an explainer and upload the explanation it generates to the run for later analysis.
To create an explanation for the experiment script, you'll need to ensure that the azureml-interpret
and azureml-contrib-interpret
packages are installed in the run environment. Then you can use these to create an explanation from your trained model and upload it to the run outputs.
run = Run.get_context()
# code to train model goes here
# get explanation
explainer = TabularExplainer(model, X_train, features=features, classes=labels)
explanation = explainer.explain_global(X_test)
# get an explanation client and upload the explanation
explain_client = ExplanationClient.from_run(run)
explain_client.upload_model_explanation(explanation, comment='Tabular Explanation')
run.complete()
You can view the explanation you created for your model in the Explanations tab for the run in Azure ML Studio.
Visualizing explanations
Model explanations in Azure ML Studio include multiple visualizations that you can use to explore feature importance. Visualizations:
- global feature importance
- summary importance: shows the distribution of individual importance values for each feature across the test dataset
- local feature importance by selecting an individual data point
Monitor models
You can use Application Insights to capture and review telemetry from models published with Azure ML. You must have an Application Insights resource associated with your Azure ML workspace.
When you create an Azure ML workspace, you can select an Application Insights resource. If you do not select an existing resource, a new one is created in the same resource group as your workspace.
When deploying a new real-time service, you can enable Application Insights in the deployment configuration for the service.
dep_config = AciWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=1,
enable_app_insights=True
)
If you want to enable Application Insights for a service that is already deployed, you can modify the deployment configuration for AKS based services in the Azure portal.
Capture and view telemetry
Application Insights automatically captures any information written to the standard output and error logs, and provides a query capability to view data in these logs.
You can write any value to the standard output in the scoring script by using a print
:
def run(raw_data):
data = json.loads(raw_data)['data']
predictions = model.predict(data)
print('Data: ' + str(data) + ' - Predictions: ' + str(predictions))
return predictions.tolist()
Azure ML creates a custom dimension in the data model for the output you write.
Yuo can use the Log Analytics query interface for the Applcation Insights in the Azure portal. It supports a SQL-like query syntax.
Monitor data drift
Over time there may be trends that change the profile of the data, making your model less accurate. This change in data profiles between training and inferencing is known as data drift.
Azure ML supports data drift monitoring through the use of datasets. You can compare two registered datasets to detect data drift, or you can capture new feature data submitted to a deployed model service and compare it to the dataset with which the model was trained.
You register 2 datasets:
- a baseline dataset: original training data
- a target dataset that will be compared to the baseline on time intervals. This dataset requires a column for each feature you want to compare, and a timestamp column
You define a dataset monitor to detect data drift and trigger alerts if the rate of drift exceeds a specified threshold. You can create dataset monitors using Azure ML Studio or by using the DataDriftDetector
class.
monitor = DataDriftDetector.create_from_datasets(
workspace=ws,
name='dataset-drift-monitor',
baseline_data_set=train_ds,
target_data_set=new_data_ds,
compute_target='aml-cluster',
frequency= 'week',
feature_list=['age', 'height', 'bmi'],
latency=24
)
You can backfill to immediately compare baseline to existing data in target.
backfill = monitor.backfill( dt.datetime.now() - dt.timedelta(weeks=6), dt.datetime.now())
If you have deployed a model as a real-time web service, you can capture new inferencing data s it is submitted, and compare it to the original training data. It has the benefit of automatically collecting new target data as the deployed model is used.
You include the training dataset in the model registration to provide a baseline.
model = Model.register(
workspace=ws,
model_path='./model/model.pkl',
model_name='mymodel',
datasets=[(Dataset.Scenario.TRAINING, train_ds)]
)
You enable data collection for services in which the model is used. You use the ModelDataCollector
class in each service's scoring script, writing code to capture data and predictions and write them to the data collector (which will store them in Azure blob storage).
def init():
global model, data_collect, predict_collect
model_name = 'mymodel'
model = joblib.load(Model.get_model_path(model_name))
# enable collection of data and predictions
data_collect = ModelDataCollector(
model_name,
designation='inputs',
features=['age', 'height', 'bmi']
)
predict_collect = ModelDataCollector(
model_name,
designation='predictions',
features=['prediction']
)
def run(raw_data):
data = json.loads(raw_data['data'])
predictions = model.predict(data)
data_collect(data)
predict_collect(predictions)
return predictions.tolist()
With the data collection code in place in the scoring script, you can enable data collection in the deployment configuration.
dep_config = AksWebservice.deploy_configuration(collect_model_data=True)
You can configure data drift monitoring by using a DataDriftDetector
class.
model = ws.models['mymodel']
datadrift = DataDriftDetector.create_from_model(
ws,
model.name,
model.version,
services=['my-svc'],
frequency='Week'
)
Scheduling alerts
You can specify a threshold for the rate of data drift and an operator email for notifications.
Monitoring works by running a comparison at scheduled frequency (day, week, or month), and calculating data drift metrics for the features. For dataset monitors, you can specify a latency indicating the number of hours to allow for new data to be collected and added to the target dataset. For deployed model data drifts monitor, you can specify a schedule_start
time value to indicate when the data drift run should start (if omitted, the run will start at the current time).
Data drift is measured using a calculated magnitude of change in the statistical distributions of feature values over time. You can configure a threshold for data drift magnitude.
alert_email = AlertConfiguration('data_scientist@contoso.com')
monitor = DataDriftDetector.create_from_datasets(
ws,
'dataset-drift-detector',
baseline_data_set,
target_data_set,
compute_target=cpu_cluster,
frequency='Week',
latency=2,
drift_threshold=0.3,
alert_configuration=alert_email
)