From 8c50bcb7e99922dadfb13864c97ea92b48aff3f8 Mon Sep 17 00:00:00 2001 From: Amit Prajapati Date: Mon, 30 Jun 2025 20:29:55 +0530 Subject: [PATCH 1/4] CLI to verify policies to perform operations related to AQUA. --- ads/aqua/cli.py | 2 + ads/aqua/verify_policies/__init__.py | 8 + ads/aqua/verify_policies/constants.py | 9 + ads/aqua/verify_policies/entities.py | 29 +++ ads/aqua/verify_policies/messages.py | 106 +++++++++++ ads/aqua/verify_policies/utils.py | 243 ++++++++++++++++++++++++++ ads/aqua/verify_policies/verify.py | 214 +++++++++++++++++++++++ log.txt | 24 +++ pyproject.toml | 3 +- 9 files changed, 637 insertions(+), 1 deletion(-) create mode 100644 ads/aqua/verify_policies/__init__.py create mode 100644 ads/aqua/verify_policies/constants.py create mode 100644 ads/aqua/verify_policies/entities.py create mode 100644 ads/aqua/verify_policies/messages.py create mode 100644 ads/aqua/verify_policies/utils.py create mode 100644 ads/aqua/verify_policies/verify.py create mode 100644 log.txt diff --git a/ads/aqua/cli.py b/ads/aqua/cli.py index 07e83c974..9fbc83a68 100644 --- a/ads/aqua/cli.py +++ b/ads/aqua/cli.py @@ -14,6 +14,7 @@ from ads.aqua.finetuning import AquaFineTuningApp from ads.aqua.model import AquaModelApp from ads.aqua.modeldeployment import AquaDeploymentApp +from ads.aqua.verify_policies import AquaVerifyPoliciesApp from ads.common.utils import LOG_LEVELS @@ -29,6 +30,7 @@ class AquaCommand: fine_tuning = AquaFineTuningApp deployment = AquaDeploymentApp evaluation = AquaEvaluationApp + verify_policies = AquaVerifyPoliciesApp def __init__( self, diff --git a/ads/aqua/verify_policies/__init__.py b/ads/aqua/verify_policies/__init__.py new file mode 100644 index 000000000..2baac9c00 --- /dev/null +++ b/ads/aqua/verify_policies/__init__.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*-- + +# Copyright (c) 2024 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +from ads.aqua.verify_policies.verify import AquaVerifyPoliciesApp + +__all__ = ["AquaVerifyPoliciesApp"] \ No newline at end of file diff --git a/ads/aqua/verify_policies/constants.py b/ads/aqua/verify_policies/constants.py new file mode 100644 index 000000000..14bae3550 --- /dev/null +++ b/ads/aqua/verify_policies/constants.py @@ -0,0 +1,9 @@ +OBS_MANAGE_TEST_FILE = "AQUA Policy Verification - OBJECT STORAGE" +TEST_MODEL_NAME="AQUA Policy Verification - Model" +TEST_MD_NAME="AQUA Policy Verification - Model Deployment" +TEST_EVALUATION_JOB_NAME="AQUA Policy Verification - Job" +TEST_EVALUATION_JOB_RUN_NAME="AQUA Policy Verification - Job Run" +TEST_EVALUATION_MVS_NAME="AQUA Policy Verification - Model Version Set" +TEST_VM_SHAPE="VM.Standard.E4.Flex" +TEST_DEFAULT_JOB_SHAPE = "VM.Standard.E3.Flex" + diff --git a/ads/aqua/verify_policies/entities.py b/ads/aqua/verify_policies/entities.py new file mode 100644 index 000000000..42d880d13 --- /dev/null +++ b/ads/aqua/verify_policies/entities.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass +from ads.common.extended_enum import ExtendedEnum +from ads.common.serializer import DataClassSerializable + + +class PolicyStatus(ExtendedEnum): + SUCCESS = "SUCCESS" + FAILURE = "FAILURE" + UNVERIFIED = "UNVERIFIED" + + +@dataclass(repr=False) +class OperationResultSuccess(DataClassSerializable): + operation: str + status: PolicyStatus = PolicyStatus.SUCCESS + + +@dataclass(repr=False) +class OperationResultFailure(DataClassSerializable): + operation: str + error: str + policy_hint: str + status: PolicyStatus = PolicyStatus.FAILURE + + +@dataclass(repr=False) +class CommonSettings(DataClassSerializable): + compartment_id: str + project_id: str diff --git a/ads/aqua/verify_policies/messages.py b/ads/aqua/verify_policies/messages.py new file mode 100644 index 000000000..0b42da18e --- /dev/null +++ b/ads/aqua/verify_policies/messages.py @@ -0,0 +1,106 @@ +from ads.aqua.verify_policies.utils import VerifyPoliciesUtils + +utils = VerifyPoliciesUtils() +operation_messages = { + utils.list_compartments.__name__: { + "name": "List Compartments", + "error": "Unable to retrieve the list of compartments. Please verify that you have the required permissions to list compartments in your tenancy. ", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to inspect compartments in tenancy" + }, + utils.list_models.__name__: { + "name": "List Models", + "error": "Failed to fetch available models. Ensure that the policies allow you to list models from the Model Catalog in the selected compartment.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-models in compartment " + }, + utils.list_project.__name__: { + "name": "List Projects", + "error": "Failed to list Data Science projects. Verify that you have the appropriate permission to access projects in the selected compartment.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-projects in compartment " + }, + utils.list_model_version_sets.__name__: { + "name": "List Model Version Sets", + "error": "Unable to load model version sets. Check your access rights to list model version sets in the selected compartment.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-modelversionsets in compartment " + }, + utils.list_jobs.__name__: { + "name": "List Jobs", + "error": "Job list could not be retrieved. Please confirm that you have the necessary permissions to view jobs in the compartment.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-jobs in compartment " + }, + utils.list_job_runs.__name__: { + "name": "List Job Runs", + "error": "Job Runs list could not be retrieved. Please confirm that you have the necessary permissions to view job runs in the compartme", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-job-runs in compartment " + }, + utils.list_buckets.__name__: { + "name": "List Object Storage Buckets", + "error": "Cannot fetch Object Storage buckets. Verify that you have access to list buckets within the specified compartment.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to read objectstorage-namespaces in compartment \nAllow dynamic-group aqua-dynamic-group to read buckets in compartment " + }, + utils.manage_bucket.__name__: { + "name": "Object Storage Access", + "error": "Failed to access the Object Storage bucket. Verify that the bucket exists and you have write permissions.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage object-family in compartment where any {target.bucket.name=''}" + }, + utils.list_log_groups.__name__: { + "name": "List Log Groups", + "error": "Log groups or logs could not be retrieved. Please confirm you have logging read access for the selected compartment.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to use logging-family in compartment " + }, + utils.get_resource_availability.__name__: { + "name": "Verify Shape Limits", + "error": "Failed to retrieve shape limits for your compartment. Make sure the required policies are in place to read shape and quota data.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to read resource-availability in compartment " + }, + utils.register_model.__name__: { + "name": "Register Model", + "error": "Model registration failed. Ensure you have the correct permissions to write to the Model Catalog and access Object Storage.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-models in compartment " + }, + utils.aqua_model.delete_model.__name__: { + "name": "Delete Model", + "error": "Could not delete model. Please confirm you have delete permissions for Model Catalog resources in the compartment.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-models in compartment " + }, + utils.aqua_deploy.create.__name__: { + "name": "Model Deployment", + "error": "Model deployment could not be created. Confirm you have correct permissions to deploy models to the Model Deployment service.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-model-deployments in compartment " + }, + utils.create_job.__name__: { + "name": "Create Job", + "error": "Job could not be created. Please check if you have permissions to create Data Science jobs.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-jobs in compartment " + }, + utils.create_job_run.__name__: { + "name": "Create Job Run", + "error": "Job Run could not be created. Confirm that you are allowed to run jobs in the selected compartment.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-job-runs in compartment " + }, + "delete_job": { + "name": "Delete Job", + "error": "Job could not be deleted. Please check if you have permissions to delete Data Science jobs.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-jobs in compartment " + }, + utils.aqua_model.create_model_version_set.__name__: { + "name": "Create Model Version Set", + "error": "Unable to create a model version set for storing evaluation results. Ensure that required Model Catalog permissions are set.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-modelversionsets in compartment " + }, + utils.aqua_model.ds_client.delete_model_version_set.__name__: { + "name": "Delete Model Version Set", + "error": "Unable to delete a model version. Ensure that required Model Catalog permissions are set.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-modelversionsets in compartment " + }, + utils.create_model_deployment.__name__: { + "name": "Create Model Deployment", + "error": "Model deployment could not be created. Confirm you have correct permissions to deploy models to the Model Deployment service.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-model-deployments in compartment " + }, + utils.aqua_deploy.delete.__name__: { + "name": "Delete Model Deployment", + "error": "Unable to delete the model deployment. Please check if you have appropriate permissions to manage deployments.", + "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-model-deployments in compartment " + } + +} diff --git a/ads/aqua/verify_policies/utils.py b/ads/aqua/verify_policies/utils.py new file mode 100644 index 000000000..4aef941ab --- /dev/null +++ b/ads/aqua/verify_policies/utils.py @@ -0,0 +1,243 @@ +import logging +from ads.aqua.model.model import AquaModelApp +from ads.aqua.modeldeployment import AquaDeploymentApp +from ads.aqua.verify_policies.constants import OBS_MANAGE_TEST_FILE, TEST_DEFAULT_JOB_SHAPE, TEST_MD_NAME, \ + TEST_MODEL_NAME +from ads.aqua.verify_policies.entities import PolicyStatus +from ads.common.auth import default_signer +from ads.common.oci_mixin import LIFECYCLE_STOP_STATE +from ads.config import COMPARTMENT_OCID, DATA_SCIENCE_SERVICE_NAME, TENANCY_OCID, PROJECT_OCID +from ads.common import oci_client +from rich.console import Console +from rich.logging import RichHandler + +logger = logging.getLogger("aqua.policies") + +import oci + + +class VerifyPoliciesUtils: + def __init__(self): + self.aqua_model = AquaModelApp() + self.aqua_deploy = AquaDeploymentApp() + self.obs_client = oci_client.OCIClientFactory(**default_signer()).object_storage + self.model_id = None + self.job_id = None + self.limit = 3 + + def list_compartments(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", TENANCY_OCID) + limit = kwargs.pop("limit", self.limit) + return self.aqua_model.identity_client.list_compartments(compartment_id=compartment_id, limit=limit, + **kwargs).data + + def list_models(self, **kwargs): + return self.aqua_model.list(**kwargs) + + def list_log_groups(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + limit = kwargs.pop("limit", self.limit) + return self.aqua_model.logging_client.list_log_groups(compartment_id=compartment_id, limit=limit, **kwargs).data + + def list_log(self, **kwargs): + log_group_id = kwargs.pop("log_group_id") + limit = kwargs.pop("limit", self.limit) + return self.aqua_model.logging_client.list_logs(log_group_id=log_group_id, limit=limit, **kwargs).data + + def list_project(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + limit = kwargs.pop("limit", self.limit) + return self.aqua_model.ds_client.list_projects(compartment_id=compartment_id, limit=limit, **kwargs).data + + def list_model_version_sets(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + limit = kwargs.pop("limit", self.limit) + return self.aqua_model.ds_client.list_model_version_sets(compartment_id=compartment_id, limit=limit, + **kwargs).data + + def list_jobs(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + limit = kwargs.pop("limit", self.limit) + return self.aqua_model.ds_client.list_jobs(compartment_id=compartment_id, limit=limit, **kwargs).data + + def list_job_runs(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + limit = kwargs.pop("limit", self.limit) + return self.aqua_model.ds_client.list_job_runs(compartment_id=compartment_id, limit=limit, **kwargs).data + + def list_buckets(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + limit = kwargs.pop("limit", self.limit) + namespace_name = self.obs_client.get_namespace(compartment_id=compartment_id).data + return self.obs_client.list_buckets(namespace_name=namespace_name, compartment_id=compartment_id, limit=limit, + **kwargs).data + + def manage_bucket(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + namespace_name = self.obs_client.get_namespace(compartment_id=compartment_id).data + bucket = kwargs.pop("bucket") + logger.info(f"Creating file in object storage with name {OBS_MANAGE_TEST_FILE} in bucket {bucket}") + self.obs_client.put_object(namespace_name, bucket, object_name=OBS_MANAGE_TEST_FILE, put_object_body="TEST") + logger.info(f"Deleting file {OBS_MANAGE_TEST_FILE} from object storage") + self.obs_client.delete_object(namespace_name, bucket, object_name=OBS_MANAGE_TEST_FILE) + return True + + def list_model_deployment_shapes(self, **kwargs): + limit = kwargs.pop("limit", self.limit) + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + return self.aqua_model.ds_client.list_model_deployment_shapes(compartment_id=compartment_id, limit=limit, + **kwargs).data + + def get_resource_availability(self, **kwargs): + limits_client = oci_client.OCIClientFactory(**default_signer()).limits + limit_name = kwargs.pop("limit_name") + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + return limits_client.get_resource_availability(compartment_id=compartment_id, + service_name=DATA_SCIENCE_SERVICE_NAME, + limit_name=limit_name).data + + def register_model(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + project_id = kwargs.pop("project_id", PROJECT_OCID) + + create_model_details = oci.data_science.models.CreateModelDetails( + compartment_id=compartment_id, + project_id=project_id, + display_name=TEST_MODEL_NAME + ) + logger.info(f"Registering test model `{TEST_MODEL_NAME}`") + model_id = self.aqua_model.ds_client.create_model(create_model_details=create_model_details).data.id + self.aqua_model.ds_client.create_model_artifact( + model_id=model_id, + model_artifact=b"7IV6cktUGcHIhur4bXTv" + ).data + self.model_id = model_id + return model_id + + def create_model_deployment(self, **kwargs): + model_id = kwargs.pop("model_id") + instance_shape = kwargs.pop("instance_shape") + model_deployment_instance_shape_config_details = oci.data_science.models.ModelDeploymentInstanceShapeConfigDetails( + ocpus=1, + memory_in_gbs=6) + instance_configuration = oci.data_science.models.InstanceConfiguration( + instance_shape_name=instance_shape, + model_deployment_instance_shape_config_details=model_deployment_instance_shape_config_details + ) + model_configuration_details = oci.data_science.models.ModelConfigurationDetails( + model_id=model_id, + instance_configuration=instance_configuration + ) + + model_deployment_configuration_details = oci.data_science.models.SingleModelDeploymentConfigurationDetails( + model_configuration_details=model_configuration_details + ) + create_model_deployment_details = oci.data_science.models.CreateModelDeploymentDetails( + compartment_id=COMPARTMENT_OCID, + project_id=PROJECT_OCID, + display_name=TEST_MD_NAME, + model_deployment_configuration_details=model_deployment_configuration_details, + ) + md_ocid = self.aqua_model.ds_client.create_model_deployment( + create_model_deployment_details=create_model_deployment_details).data.id + waiter_result = oci.wait_until( + self.aqua_model.ds_client, + self.aqua_model.ds_client.get_model_deployment(md_ocid), + evaluate_response=lambda r: self._evaluate_response(wait_message="Waiting for Model Deployment to finish", + response=r), + max_interval_seconds=30, + ) + logger.info("Model Deployment may result in FAILED state.") + return md_ocid + + def _evaluate_response(self, wait_message, response): + logger.info(f"{wait_message}, Current state: {response.data.lifecycle_state}") + return getattr(response.data, 'lifecycle_state').upper() in LIFECYCLE_STOP_STATE + + def create_job(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + project_id = kwargs.pop("project_id", PROJECT_OCID) + shape_name = kwargs.pop("shape_name", TEST_DEFAULT_JOB_SHAPE) + display_name = kwargs.pop("display_name") + + response = self.aqua_model.ds_client.create_job( + create_job_details=oci.data_science.models.CreateJobDetails( + display_name=display_name, + project_id=project_id, + compartment_id=compartment_id, + job_configuration_details=oci.data_science.models.DefaultJobConfigurationDetails( + job_type="DEFAULT", + environment_variables={}), + job_infrastructure_configuration_details=oci.data_science.models.StandaloneJobInfrastructureConfigurationDetails( + job_infrastructure_type="ME_STANDALONE", + shape_name=shape_name, + job_shape_config_details=oci.data_science.models.JobShapeConfigDetails( + ocpus=1, + memory_in_gbs=16), + block_storage_size_in_gbs=50 + ) + ) + ) + + job_id = response.data.id + self.aqua_model.ds_client.create_job_artifact(job_id=job_id, job_artifact=b"echo OK\n", + content_disposition="attachment; filename=entry.sh") + self.job_id = job_id + return job_id + + def create_job_run(self, **kwargs): + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + project_id = kwargs.pop("project_id", PROJECT_OCID) + job_id = kwargs.pop("job_id") + display_name = kwargs.pop("display_name") + response = self.aqua_model.ds_client.create_job_run( + create_job_run_details=oci.data_science.models.CreateJobRunDetails( + project_id=project_id, + compartment_id=compartment_id, + job_id=job_id, + display_name=display_name + ) + ) + job_run_id = response.data.id + + waiter_result = oci.wait_until( + self.aqua_model.ds_client, + self.aqua_model.ds_client.get_job_run(job_run_id), + evaluate_response=lambda r: self._evaluate_response(wait_message="Waiting for job run to finish", + response=r), + max_interval_seconds=30, + max_wait_seconds=600 + ) + return waiter_result + + def create_model_version_set(self, **kwargs): + name = kwargs.pop("name") + compartment_id = kwargs.pop("compartment_id", COMPARTMENT_OCID) + project_id = kwargs.pop("project_id", PROJECT_OCID) + return self.aqua_model.create_model_version_set(model_version_set_name=name, compartment_id=compartment_id, + project_id=project_id) + + +class RichStatusLog: + def __init__(self): + self.console = Console() + # logger = logging.("aqua.policies") + handler = RichHandler(console=self.console, + markup=True, + rich_tracebacks=False, + show_time=False, + show_path=False) + logger.addHandler(handler) + logger.propagate = False + self.logger = logger + + def get_logger(self): + return self.logger + + def get_status_emoji(self, status: PolicyStatus): + if status == PolicyStatus.SUCCESS: + return ":white_check_mark:[green]" + if status == PolicyStatus.FAILURE: + return ":cross_mark:[red]" + if status == PolicyStatus.UNVERIFIED: + return ":exclamation_question_mark:[yellow]" diff --git a/ads/aqua/verify_policies/verify.py b/ads/aqua/verify_policies/verify.py new file mode 100644 index 000000000..bae681373 --- /dev/null +++ b/ads/aqua/verify_policies/verify.py @@ -0,0 +1,214 @@ +import logging + +import click +import oci.exceptions + +from ads.aqua.verify_policies.constants import TEST_EVALUATION_JOB_NAME, TEST_EVALUATION_JOB_RUN_NAME, \ + TEST_EVALUATION_MVS_NAME, TEST_MD_NAME, TEST_VM_SHAPE +from ads.aqua.verify_policies.messages import operation_messages +from ads.aqua.verify_policies.entities import OperationResultSuccess, OperationResultFailure, PolicyStatus +from ads.aqua.verify_policies.utils import VerifyPoliciesUtils, RichStatusLog +from functools import wraps + +logger = logging.getLogger("aqua.policies") + + +def with_spinner(func): + @wraps(func) + def wrapper(self, function, **kwargs): + operation_message = operation_messages[function.__name__] + ignore_spinner = kwargs.pop("ignore_spinner", False) + + def run_func(): + return_value, result_status = func(self, function, **kwargs) + result_message = f"{self._rich_ui.get_status_emoji(result_status.status)} {result_status.operation}" + if result_status.status == PolicyStatus.SUCCESS: + logger.info(result_message) + else: + logger.warning(result_message) + logger.info(result_status.error) + logger.info(f"Policy hint: {result_status.policy_hint}") + return return_value, result_status + + if ignore_spinner: + return run_func() + else: + with self._rich_ui.console.status(f"Verifying {operation_message['name']}") as status: + return run_func() + + return wrapper + + +class AquaVerifyPoliciesApp: + """Provide options to verify policies of common operation in AQUA such as Model Registration, Model Deployment, Evaluation & Fine-Tuning. + + Methods + ------- + common_policies: + Verify if policies are properly defined for operation such as List Compartments, List Models, List Logs, List Projects, List Jobs, etc. + model_register: + Verify if policies are properly defined to register new models into the Model Catalog. Required to use AQUA to upload and manage custom or service models. + model_deployment: + Verify if policies are properly defined to allows users to deploy models to the OCI Model Deployment service. This operation provisions and starts a new deployment instance. + evaluation: + Verify if policies are properly defined to access to resources such as models, jobs, job-runs and buckets needed throughout the evaluation process. + """ + + def __init__(self): + super().__init__() + self._util = VerifyPoliciesUtils() + self._rich_ui = RichStatusLog() + self.model_id = None + logger.propagate = False + logger.setLevel(logging.INFO) + + def _get_operation_result(self, operation, status): + operation_message = operation_messages[operation.__name__] + if status == PolicyStatus.SUCCESS: + return OperationResultSuccess(operation=operation_message["name"]) + if status == PolicyStatus.UNVERIFIED: + return OperationResultSuccess(operation=operation_message["name"], status=status) + if status == PolicyStatus.FAILURE: + return OperationResultFailure(operation=operation_message["name"], error=operation_message["error"], + policy_hint=operation_message["policy_hint"]) + + @with_spinner + def _execute(self, function, **kwargs): + result = None + try: + result = function(**kwargs) + status = PolicyStatus.SUCCESS + except oci.exceptions.ServiceError as oci_error: + if oci_error.status == 404: + logger.debug(oci_error) + status = PolicyStatus.FAILURE + else: + logger.error(oci_error) + raise oci_error + except Exception as e: + logger.error(e) + raise e + return result, self._get_operation_result(function, status) + + def _test_model_register(self, **kwargs): + result = [] + bucket = kwargs.pop("bucket") + _, test_manage_obs_policy = self._execute(self._util.manage_bucket, bucket=bucket, **kwargs) + result.append(test_manage_obs_policy.to_dict()) + + if test_manage_obs_policy.status == PolicyStatus.SUCCESS: + self.model_id, test_model_register = self._execute(self._util.register_model) + result.append(test_model_register.to_dict()) + return result + + def _test_delete_model(self, **kwargs): + if self.model_id is not None: + _, test_delete_model_test = self._execute(self._util.aqua_model.ds_client.delete_model, + model_id=self.model_id, **kwargs) + return [test_delete_model_test.to_dict()] + else: + return [self._get_operation_result(self._util.aqua_model.ds_client.delete_model, + PolicyStatus.UNVERIFIED).to_dict()] + + def _test_model_deployment(self, **kwargs): + logger.info(f"Creating Model Deployment with name {TEST_MD_NAME}") + md_ocid, test_model_deployment = self._execute(self._util.create_model_deployment, model_id=self.model_id, + instance_shape=TEST_VM_SHAPE) + _, test_delete_md = self._execute(self._util.aqua_deploy.delete, model_deployment_id=md_ocid) + return [test_model_deployment.to_dict(), test_delete_md.to_dict()] + + def _test_delete_model_deployment(self, **kwargs): + pass + + def _prompt(self, message, bool=False): + if bool: + return click.confirm(message, default=False) + else: + return click.prompt(message, type=str) + + def _consent(self): + answer = self._prompt("Do you want to continue?", bool=True) + if not answer: + exit(0) + + def common_policies(self, **kwargs): + logger.info("[magenta]Verifying Common Policies") + basic_operations = [self._util.list_compartments, self._util.list_models, self._util.list_model_version_sets, + self._util.list_project, self._util.list_jobs, self._util.list_job_runs, + self._util.list_buckets, + self._util.list_log_groups + ] + result = [] + for op in basic_operations: + _, status = self._execute(op, **kwargs) + result.append(status.to_dict()) + + _, get_resource_availability_status = self._execute(self._util.get_resource_availability, + limit_name="ds-gpu-a10-count") + result.append(get_resource_availability_status.to_dict()) + return result + + def model_register(self, **kwargs): + logger.info("[magenta]Verifying Model Register") + logger.info("Object and Model will be created.") + kwargs.pop("consent", None) == True or self._consent() + + model_save_bucket = kwargs.pop("bucket", None) or self._prompt( + "Provide bucket name where model artifacts will be saved") + register_model_result = self._test_model_register(bucket=model_save_bucket) + delete_model_result = self._test_delete_model(**kwargs) + return [*register_model_result, *delete_model_result] + + def model_deployment(self, **kwargs): + logger.info("[magenta]Verifying Model Deployment") + logger.info("Object, Model, Model deployment will be created.") + kwargs.pop("consent", None) == True or self._consent() + model_save_bucket = kwargs.pop("bucket", None) or self._prompt( + "Provide bucket name where model artifacts will be saved") + model_register = self._test_model_register(bucket=model_save_bucket) + model_deployment = self._test_model_deployment() + delete_model_result = self._test_delete_model(**kwargs) + + return [*model_register, *model_deployment, *delete_model_result] + + def evaluation(self, **kwargs): + logger.info("[magenta]Verifying Evaluation") + logger.info("Model Version Set, Model, Object, Job and JobRun will be created.") + kwargs.pop("consent", None) == True or self._consent() + + # Create & Delete MVS + logger.info(f"Creating Model Version set with name {TEST_EVALUATION_MVS_NAME}") + + model_mvs, test_create_mvs = self._execute(self._util.aqua_model.create_model_version_set, + name=TEST_EVALUATION_MVS_NAME) + model_mvs_id = model_mvs[0] + if model_mvs_id: + logger.info(f"Deleting Model Version set {TEST_EVALUATION_MVS_NAME}") + _, delete_mvs = self._execute(self._util.aqua_model.ds_client.delete_model_version_set, + model_version_set_id=model_mvs_id) + else: + delete_mvs = self._get_operation_result(self._util.aqua_model.ds_client.delete_model_version_set, + PolicyStatus.UNVERIFIED) + + # Create & Model + model_save_bucket = kwargs.pop("bucket", None) or self._prompt( + "Provide bucket name where model artifacts will be saved") + register_model_result = self._test_model_register(bucket=model_save_bucket) + delete_model_result = self._test_delete_model(**kwargs) + + # Create Job & JobRun. + evaluation_job_id, test_create_job = self._execute(self._util.create_job, display_name=TEST_EVALUATION_JOB_NAME, + **kwargs) + _, test_create_job_run = self._execute(self._util.create_job_run, display_name=TEST_EVALUATION_JOB_RUN_NAME, + job_id=evaluation_job_id, **kwargs) + + # Delete Job Run + if evaluation_job_id: + _, delete_job = self._execute(self._util.aqua_model.ds_client.delete_job, job_id=evaluation_job_id) + else: + delete_job = self._get_operation_result(self._util.aqua_model.ds_client.delete_job, PolicyStatus.UNVERIFIED) + + return [test_create_mvs.to_dict(), delete_mvs.to_dict(), *register_model_result, *delete_model_result, + test_create_job.to_dict(), test_create_job_run.to_dict(), delete_job.to_dict()] + + diff --git a/log.txt b/log.txt new file mode 100644 index 000000000..317fb8f92 --- /dev/null +++ b/log.txt @@ -0,0 +1,24 @@ +👋 Waiting, Amit! +👋 Waiting, Amit! +👋 Waiting, Amit! +👋 Waiting, Amit! +👋 Waiting, Amit! +⠋ Processing...✅ Done after 5 seconds! +None +INFO Verifying Common Policies +INFO ✅ List Compartments +INFO ✅ List Models +INFO ✅ List Model Version Sets +INFO ✅ List Projects +INFO ✅ List Job Runs +INFO ✅ List Jobs +INFO ✅ List Object Storage Buckets +INFO ✅ List Logs +{'operation': 'List Compartments', 'status': 'SUCCESS'} +{'operation': 'List Models', 'status': 'SUCCESS'} +{'operation': 'List Model Version Sets', 'status': 'SUCCESS'} +{'operation': 'List Projects', 'status': 'SUCCESS'} +{'operation': 'List Job Runs', 'status': 'SUCCESS'} +{'operation': 'List Jobs', 'status': 'SUCCESS'} +{'operation': 'List Object Storage Buckets', 'status': 'SUCCESS'} +{'operation': 'List Logs', 'status': 'SUCCESS'} diff --git a/pyproject.toml b/pyproject.toml index 8c1a4d6cd..9d121e927 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -222,7 +222,8 @@ aqua = [ "fire", "cachetools", "huggingface_hub", - "python-dotenv" + "python-dotenv", + "rich" ] # To reduce backtracking (decrese deps install time) during test/dev env setup reducing number of versions pip is From d3932b9798fd198af27992890de3acf5e5955313 Mon Sep 17 00:00:00 2001 From: Amit Prajapati Date: Thu, 3 Jul 2025 22:22:03 +0530 Subject: [PATCH 2/4] Unint Test and documentation --- ads/aqua/verify_policies/constants.py | 6 +- ads/aqua/verify_policies/messages.py | 7 +- ads/aqua/verify_policies/utils.py | 8 +- ads/aqua/verify_policies/verify.py | 198 ++++++++++--- .../with_extras/aqua/test_verify_policies.py | 276 ++++++++++++++++++ .../aqua/test_verify_policies_utils.py | 189 ++++++++++++ 6 files changed, 636 insertions(+), 48 deletions(-) create mode 100644 tests/unitary/with_extras/aqua/test_verify_policies.py create mode 100644 tests/unitary/with_extras/aqua/test_verify_policies_utils.py diff --git a/ads/aqua/verify_policies/constants.py b/ads/aqua/verify_policies/constants.py index 14bae3550..6fbb1256d 100644 --- a/ads/aqua/verify_policies/constants.py +++ b/ads/aqua/verify_policies/constants.py @@ -1,9 +1,9 @@ OBS_MANAGE_TEST_FILE = "AQUA Policy Verification - OBJECT STORAGE" TEST_MODEL_NAME="AQUA Policy Verification - Model" TEST_MD_NAME="AQUA Policy Verification - Model Deployment" -TEST_EVALUATION_JOB_NAME="AQUA Policy Verification - Job" -TEST_EVALUATION_JOB_RUN_NAME="AQUA Policy Verification - Job Run" -TEST_EVALUATION_MVS_NAME="AQUA Policy Verification - Model Version Set" +TEST_JOB_NAME="AQUA Policy Verification - Job" +TEST_JOB_RUN_NAME="AQUA Policy Verification - Job Run" +TEST_MVS_NAME="AQUA Policy Verification - Model Version Set" TEST_VM_SHAPE="VM.Standard.E4.Flex" TEST_DEFAULT_JOB_SHAPE = "VM.Standard.E3.Flex" diff --git a/ads/aqua/verify_policies/messages.py b/ads/aqua/verify_policies/messages.py index 0b42da18e..21de792d7 100644 --- a/ads/aqua/verify_policies/messages.py +++ b/ads/aqua/verify_policies/messages.py @@ -62,11 +62,6 @@ "error": "Could not delete model. Please confirm you have delete permissions for Model Catalog resources in the compartment.", "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-models in compartment " }, - utils.aqua_deploy.create.__name__: { - "name": "Model Deployment", - "error": "Model deployment could not be created. Confirm you have correct permissions to deploy models to the Model Deployment service.", - "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-model-deployments in compartment " - }, utils.create_job.__name__: { "name": "Create Job", "error": "Job could not be created. Please check if you have permissions to create Data Science jobs.", @@ -97,7 +92,7 @@ "error": "Model deployment could not be created. Confirm you have correct permissions to deploy models to the Model Deployment service.", "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-model-deployments in compartment " }, - utils.aqua_deploy.delete.__name__: { + utils.aqua_model.ds_client.delete_model_deployment.__name__: { "name": "Delete Model Deployment", "error": "Unable to delete the model deployment. Please check if you have appropriate permissions to manage deployments.", "policy_hint": "Allow dynamic-group aqua-dynamic-group to manage data-science-model-deployments in compartment " diff --git a/ads/aqua/verify_policies/utils.py b/ads/aqua/verify_policies/utils.py index 4aef941ab..f280ecb8f 100644 --- a/ads/aqua/verify_policies/utils.py +++ b/ads/aqua/verify_policies/utils.py @@ -17,9 +17,13 @@ class VerifyPoliciesUtils: + """ + Utility class for verifying OCI IAM policies through operations on Data Science resources. + Provides methods to interact with models, model deployments, jobs, object storage, and limits APIs + using Oracle Accelerated Data Science (ADS) SDK. + """ def __init__(self): self.aqua_model = AquaModelApp() - self.aqua_deploy = AquaDeploymentApp() self.obs_client = oci_client.OCIClientFactory(**default_signer()).object_storage self.model_id = None self.job_id = None @@ -159,6 +163,7 @@ def create_job(self, **kwargs): project_id = kwargs.pop("project_id", PROJECT_OCID) shape_name = kwargs.pop("shape_name", TEST_DEFAULT_JOB_SHAPE) display_name = kwargs.pop("display_name") + subnet_id = kwargs.pop("subnet_id", None) response = self.aqua_model.ds_client.create_job( create_job_details=oci.data_science.models.CreateJobDetails( @@ -171,6 +176,7 @@ def create_job(self, **kwargs): job_infrastructure_configuration_details=oci.data_science.models.StandaloneJobInfrastructureConfigurationDetails( job_infrastructure_type="ME_STANDALONE", shape_name=shape_name, + subnet_id=subnet_id, job_shape_config_details=oci.data_science.models.JobShapeConfigDetails( ocpus=1, memory_in_gbs=16), diff --git a/ads/aqua/verify_policies/verify.py b/ads/aqua/verify_policies/verify.py index bae681373..68a14878e 100644 --- a/ads/aqua/verify_policies/verify.py +++ b/ads/aqua/verify_policies/verify.py @@ -3,8 +3,8 @@ import click import oci.exceptions -from ads.aqua.verify_policies.constants import TEST_EVALUATION_JOB_NAME, TEST_EVALUATION_JOB_RUN_NAME, \ - TEST_EVALUATION_MVS_NAME, TEST_MD_NAME, TEST_VM_SHAPE +from ads.aqua.verify_policies.constants import TEST_JOB_NAME, TEST_JOB_RUN_NAME, \ + TEST_MVS_NAME, TEST_MD_NAME, TEST_VM_SHAPE from ads.aqua.verify_policies.messages import operation_messages from ads.aqua.verify_policies.entities import OperationResultSuccess, OperationResultFailure, PolicyStatus from ads.aqua.verify_policies.utils import VerifyPoliciesUtils, RichStatusLog @@ -14,6 +14,12 @@ def with_spinner(func): + """Decorator to wrap execution of a function with a rich UI spinner. + + Displays status while the operation runs and logs success or failure messages + based on the policy verification result. + """ + @wraps(func) def wrapper(self, function, **kwargs): operation_message = operation_messages[function.__name__] @@ -40,18 +46,21 @@ def run_func(): class AquaVerifyPoliciesApp: - """Provide options to verify policies of common operation in AQUA such as Model Registration, Model Deployment, Evaluation & Fine-Tuning. + """ + AquaVerifyPoliciesApp provides methods to verify IAM policies required for + various operations in OCI Data Science's AQUA (Accelerated Data Science) platform. + + This utility is intended to help users validate whether they have the necessary + permissions to perform common AQUA workflows such as model registration, + deployment, evaluation, and fine-tuning. Methods ------- - common_policies: - Verify if policies are properly defined for operation such as List Compartments, List Models, List Logs, List Projects, List Jobs, etc. - model_register: - Verify if policies are properly defined to register new models into the Model Catalog. Required to use AQUA to upload and manage custom or service models. - model_deployment: - Verify if policies are properly defined to allows users to deploy models to the OCI Model Deployment service. This operation provisions and starts a new deployment instance. - evaluation: - Verify if policies are properly defined to access to resources such as models, jobs, job-runs and buckets needed throughout the evaluation process. + `common_policies()`: Validates basic read-level policies across AQUA components. + `model_register()`: Checks policies for object storage access and model registration. + `model_deployment()`: Validates policies for registering and deploying models. + `evaluation()`: Confirms ability to manage model version sets, jobs, and storage for evaluation. + `finetune()`: Verifies access required to fine-tune models. """ def __init__(self): @@ -63,6 +72,15 @@ def __init__(self): logger.setLevel(logging.INFO) def _get_operation_result(self, operation, status): + """Maps a function and policy status to a corresponding result object. + + Parameters: + operation (function): The operation being verified. + status (PolicyStatus): The outcome of the policy verification. + + Returns: + OperationResultSuccess or OperationResultFailure based on status. + """ operation_message = operation_messages[operation.__name__] if status == PolicyStatus.SUCCESS: return OperationResultSuccess(operation=operation_message["name"]) @@ -74,6 +92,14 @@ def _get_operation_result(self, operation, status): @with_spinner def _execute(self, function, **kwargs): + """Executes a given operation function with policy validation and error handling. + Parameters: + function (callable): The function to execute. + kwargs (dict): Keyword arguments to pass to the function. + + Returns: + Tuple: (result, OperationResult) + """ result = None try: result = function(**kwargs) @@ -91,6 +117,11 @@ def _execute(self, function, **kwargs): return result, self._get_operation_result(function, status) def _test_model_register(self, **kwargs): + """Verifies policies required to manage an object storage bucket and register a model. + + Returns: + List of result dicts for bucket management and model registration. + """ result = [] bucket = kwargs.pop("bucket") _, test_manage_obs_policy = self._execute(self._util.manage_bucket, bucket=bucket, **kwargs) @@ -102,6 +133,11 @@ def _test_model_register(self, **kwargs): return result def _test_delete_model(self, **kwargs): + """Attempts to delete the test model created during model registration. + + Returns: + List containing the result of model deletion. + """ if self.model_id is not None: _, test_delete_model_test = self._execute(self._util.aqua_model.ds_client.delete_model, model_id=self.model_id, **kwargs) @@ -111,27 +147,89 @@ def _test_delete_model(self, **kwargs): PolicyStatus.UNVERIFIED).to_dict()] def _test_model_deployment(self, **kwargs): + """Verifies policies required to create and delete a model deployment. + + Returns: + List of result dicts for deployment creation and deletion. + """ logger.info(f"Creating Model Deployment with name {TEST_MD_NAME}") md_ocid, test_model_deployment = self._execute(self._util.create_model_deployment, model_id=self.model_id, instance_shape=TEST_VM_SHAPE) - _, test_delete_md = self._execute(self._util.aqua_deploy.delete, model_deployment_id=md_ocid) + _, test_delete_md = self._execute(self._util.aqua_model.ds_client.delete_model_deployment, model_deployment_id=md_ocid) return [test_model_deployment.to_dict(), test_delete_md.to_dict()] - def _test_delete_model_deployment(self, **kwargs): - pass + def _test_manage_mvs(self, **kwargs): + """Verifies policies required to create and delete a model version set (MVS). + + Returns: + List of result dicts for MVS creation and deletion. + """ + logger.info(f"Creating Model Version set with name {TEST_MVS_NAME}") + + model_mvs, test_create_mvs = self._execute(self._util.aqua_model.create_model_version_set, + name=TEST_MVS_NAME) + model_mvs_id = model_mvs[0] + if model_mvs_id: + logger.info(f"Deleting Model Version set {TEST_MVS_NAME}") + _, delete_mvs = self._execute(self._util.aqua_model.ds_client.delete_model_version_set, + model_version_set_id=model_mvs_id) + else: + delete_mvs = self._get_operation_result(self._util.aqua_model.ds_client.delete_model_version_set, + PolicyStatus.UNVERIFIED) + return [test_create_mvs.to_dict(), delete_mvs.to_dict()] + + def _test_manage_job(self, **kwargs): + """Verifies policies required to create a job, create a job run, and delete the job. + + Returns: + List of result dicts for job creation, job run creation, and job deletion. + """ + + # Create Job & JobRun. + evaluation_job_id, test_create_job = self._execute(self._util.create_job, display_name=TEST_JOB_NAME, + **kwargs) + _, test_create_job_run = self._execute(self._util.create_job_run, display_name=TEST_JOB_RUN_NAME, + job_id=evaluation_job_id, **kwargs) + + # Delete Job Run + if evaluation_job_id: + _, delete_job = self._execute(self._util.aqua_model.ds_client.delete_job, job_id=evaluation_job_id) + else: + delete_job = self._get_operation_result(self._util.aqua_model.ds_client.delete_job, PolicyStatus.UNVERIFIED) + + return [test_create_job.to_dict(), test_create_job_run.to_dict(), delete_job.to_dict()] def _prompt(self, message, bool=False): + """Wrapper for Click prompt or confirmation. + + Parameters: + message (str): The prompt message. + bool (bool): Whether to ask for confirmation instead of input. + + Returns: + User input or confirmation (bool/str). + """ if bool: return click.confirm(message, default=False) else: return click.prompt(message, type=str) def _consent(self): + """ + Prompts the user for confirmation before performing actions. + Exits if the user does not consent. + """ answer = self._prompt("Do you want to continue?", bool=True) if not answer: exit(0) def common_policies(self, **kwargs): + """Verifies basic read-level policies across various AQUA components + (e.g. compartments, models, jobs, buckets, logs). + + Returns: + List of result dicts for each verified operation. + """ logger.info("[magenta]Verifying Common Policies") basic_operations = [self._util.list_compartments, self._util.list_models, self._util.list_model_version_sets, self._util.list_project, self._util.list_jobs, self._util.list_job_runs, @@ -149,6 +247,11 @@ def common_policies(self, **kwargs): return result def model_register(self, **kwargs): + """Verifies policies required to register a model, including object storage access. + + Returns: + List of result dicts for registration and cleanup. + """ logger.info("[magenta]Verifying Model Register") logger.info("Object and Model will be created.") kwargs.pop("consent", None) == True or self._consent() @@ -160,6 +263,11 @@ def model_register(self, **kwargs): return [*register_model_result, *delete_model_result] def model_deployment(self, **kwargs): + """Verifies policies required to register and deploy a model, and perform cleanup. + + Returns: + List of result dicts for registration, deployment, and cleanup. + """ logger.info("[magenta]Verifying Model Deployment") logger.info("Object, Model, Model deployment will be created.") kwargs.pop("consent", None) == True or self._consent() @@ -172,23 +280,18 @@ def model_deployment(self, **kwargs): return [*model_register, *model_deployment, *delete_model_result] def evaluation(self, **kwargs): + """Verifies policies for evaluation workloads including model version set, + job and job runs, and object storage access. + + Returns: + List of result dicts for all evaluation steps. + """ logger.info("[magenta]Verifying Evaluation") logger.info("Model Version Set, Model, Object, Job and JobRun will be created.") kwargs.pop("consent", None) == True or self._consent() # Create & Delete MVS - logger.info(f"Creating Model Version set with name {TEST_EVALUATION_MVS_NAME}") - - model_mvs, test_create_mvs = self._execute(self._util.aqua_model.create_model_version_set, - name=TEST_EVALUATION_MVS_NAME) - model_mvs_id = model_mvs[0] - if model_mvs_id: - logger.info(f"Deleting Model Version set {TEST_EVALUATION_MVS_NAME}") - _, delete_mvs = self._execute(self._util.aqua_model.ds_client.delete_model_version_set, - model_version_set_id=model_mvs_id) - else: - delete_mvs = self._get_operation_result(self._util.aqua_model.ds_client.delete_model_version_set, - PolicyStatus.UNVERIFIED) + test_manage_mvs = self._test_manage_mvs(**kwargs) # Create & Model model_save_bucket = kwargs.pop("bucket", None) or self._prompt( @@ -196,19 +299,38 @@ def evaluation(self, **kwargs): register_model_result = self._test_model_register(bucket=model_save_bucket) delete_model_result = self._test_delete_model(**kwargs) - # Create Job & JobRun. - evaluation_job_id, test_create_job = self._execute(self._util.create_job, display_name=TEST_EVALUATION_JOB_NAME, - **kwargs) - _, test_create_job_run = self._execute(self._util.create_job_run, display_name=TEST_EVALUATION_JOB_RUN_NAME, - job_id=evaluation_job_id, **kwargs) + # Manage Jobs & Job Runs + test_job_and_job_run = self._test_manage_job(**kwargs) - # Delete Job Run - if evaluation_job_id: - _, delete_job = self._execute(self._util.aqua_model.ds_client.delete_job, job_id=evaluation_job_id) - else: - delete_job = self._get_operation_result(self._util.aqua_model.ds_client.delete_job, PolicyStatus.UNVERIFIED) + return [*test_manage_mvs, *register_model_result, *delete_model_result, *test_job_and_job_run] + + def finetune(self, **kwargs): + """Verifies policies for fine-tuning jobs, including managing object storage, + MVS. + + Returns: + List of result dicts for each fine-tuning operation. + """ + logger.info("[magenta]Verifying Finetuning") + logger.info("Object, Model Version Set, Job and JobRun will be created. VCN will be used.") + kwargs.pop("consent", None) == True or self._consent() + + # Manage bucket + bucket = kwargs.pop("bucket", None) or self._prompt( + "Provide bucket name required to save training datasets, scripts, and fine-tuned model outputs") + + subnet_id = kwargs.pop("subnet_id", None) + if subnet_id is None: + if self._prompt("Do you want to use custom subnet", bool=True): + subnet_id = self._prompt("Provide subnet id") + + _, test_manage_obs_policy = self._execute(self._util.manage_bucket, bucket=bucket, **kwargs) + + # Create & Delete MVS + test_manage_mvs = self._test_manage_mvs(**kwargs) - return [test_create_mvs.to_dict(), delete_mvs.to_dict(), *register_model_result, *delete_model_result, - test_create_job.to_dict(), test_create_job_run.to_dict(), delete_job.to_dict()] + # Manage Jobs & Job Runs + test_job_and_job_run = self._test_manage_job(subnet_id = subnet_id, **kwargs) + return [*test_manage_mvs, *test_job_and_job_run, test_manage_obs_policy.to_dict()] diff --git a/tests/unitary/with_extras/aqua/test_verify_policies.py b/tests/unitary/with_extras/aqua/test_verify_policies.py new file mode 100644 index 000000000..ae570e70e --- /dev/null +++ b/tests/unitary/with_extras/aqua/test_verify_policies.py @@ -0,0 +1,276 @@ +from unittest.mock import MagicMock, patch +import pytest +from ads.aqua.verify_policies.verify import AquaVerifyPoliciesApp +from ads.aqua.verify_policies.entities import PolicyStatus +from ads.aqua.verify_policies.entities import OperationResultSuccess, OperationResultFailure +from ads.aqua.verify_policies.messages import operation_messages +from oci.exceptions import ServiceError +from contextlib import contextmanager + +## Test Helper Operations + +# Dummy functions +def mock_success_function(): + return "success result" + +def mock_oci_error_function(**kwargs): + raise ServiceError(status=404, code="NotAuthorizedOrNotFound", message="Resource not found", headers={}, request_id="") + +# Register the function name in operation_messages for the test +operation_messages[mock_success_function.__name__] = { + "name": "Mock Success Function", + "error": "Mock error message", + "policy_hint": "Mock policy hint" +} + +operation_messages[mock_oci_error_function.__name__] = { + "name": "Mock OCI Error", + "error": "Expected error", + "policy_hint": "Check IAM policies" +} + +@pytest.fixture +def app(): + return AquaVerifyPoliciesApp() + +@pytest.fixture(autouse=True) +def suppress_logger(): + with patch("ads.aqua.verify_policies.verify.logger", new=MagicMock()): + yield + +def test_get_operation_result_success(app): + result = app._get_operation_result(mock_success_function, PolicyStatus.SUCCESS) + assert isinstance(result, OperationResultSuccess) + assert result.status == PolicyStatus.SUCCESS + assert result.operation == "Mock Success Function" + + +def test_get_operation_result_unverified(app): + result = app._get_operation_result(mock_success_function, PolicyStatus.UNVERIFIED) + assert isinstance(result, OperationResultSuccess) + assert result.status == PolicyStatus.UNVERIFIED + assert result.operation == "Mock Success Function" + + +def test_get_operation_result_failure(app): + result = app._get_operation_result(mock_success_function, PolicyStatus.FAILURE) + assert isinstance(result, OperationResultFailure) + assert result.status == PolicyStatus.FAILURE + assert result.operation == "Mock Success Function" + assert result.error == "Mock error message" + assert result.policy_hint == "Mock policy hint" + +def test_execute_success(app): + result, status = app._execute(mock_success_function) + assert result == "success result" + assert status.status == PolicyStatus.SUCCESS + assert status.operation == "Mock Success Function" + +def test_execute_oci_failure_404(app): + _, status = app._execute(mock_oci_error_function) + assert status.status == PolicyStatus.FAILURE + assert status.operation == "Mock OCI Error" + assert "Expected error" in status.error + +def test_test_model_register(app): + with patch.object(app, '_execute') as mock_execute: + # Setup the mock return values + mock_execute.side_effect = [ + (None, MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "manage_bucket"})), + ("mock_model_id", MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "register_model"})), + ] + + result = app._test_model_register(bucket="test-bucket") + + assert len(result) == 2 + assert result[0]["op"] == "manage_bucket" + assert result[1]["op"] == "register_model" + assert app.model_id == "mock_model_id" + +def test_test_delete_model_with_model_id(app): + app.model_id = "mock_model_id" + + with patch.object(app, "_execute") as mock_execute: + mock_execute.return_value = (None, MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "delete_model"})) + result = app._test_delete_model() + + assert len(result) == 1 + assert result[0]["op"] == "delete_model" + mock_execute.assert_called_once() + +def test_test_delete_model_without_model_id(app): + app.model_id = None # Simulate missing model + + result = app._test_delete_model() + + assert len(result) == 1 + assert result[0]["status"] == PolicyStatus.UNVERIFIED + +def test_test_model_deployment(app): + app.model_id = "mock_model_id" + + with patch.object(app, "_execute") as mock_execute: + # First call returns model deployment OCID + success status + # Second call returns success for deletion + mock_execute.side_effect = [ + ("mock_md_ocid", MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "create_md"})), + (None, MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "delete_md"})), + ] + + result = app._test_model_deployment() + + assert len(result) == 2 + assert result[0]["op"] == "create_md" + assert result[1]["op"] == "delete_md" + + assert mock_execute.call_count == 2 + mock_execute.assert_any_call(app._util.create_model_deployment, model_id="mock_model_id", instance_shape="VM.Standard.E4.Flex") + mock_execute.assert_any_call(app._util.aqua_model.ds_client.delete_model_deployment, model_deployment_id="mock_md_ocid") + +def test_test_manage_mvs(app): + with patch.object(app, "_execute") as mock_execute: + # Mock create_model_version_set returning MVS ID + mock_execute.side_effect = [ + (["mock_mvs_id"], MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "create_mvs"})), + (None, MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "delete_mvs"})), + ] + + result = app._test_manage_mvs() + + assert len(result) == 2 + assert result[0]["op"] == "create_mvs" + assert result[1]["op"] == "delete_mvs" + assert mock_execute.call_count == 2 + +def test_test_manage_job(app): + with patch.object(app, "_execute") as mock_execute: + # Set up sequential return values + mock_execute.side_effect = [ + ("mock_job_id", MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "create_job"})), + (None, MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "create_job_run"})), + (None, MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "delete_job"})), + ] + + result = app._test_manage_job() + + assert len(result) == 3 + assert result[0]["op"] == "create_job" + assert result[1]["op"] == "create_job_run" + assert result[2]["op"] == "delete_job" + assert mock_execute.call_count == 3 + + +## Test Public Operations +@contextmanager +def patch_common_app_methods(app): + with patch.object(app, "_consent", MagicMock(return_value=None)), \ + patch.object(app, "_prompt", MagicMock(return_value="mock-bucket")), \ + patch.object(app, "_test_manage_mvs", MagicMock(return_value=[{"op": "create_mvs"}, {"op": "delete_mvs"}])), \ + patch.object(app, "_test_model_register", MagicMock(return_value=[{"op": "register_model"}])), \ + patch.object(app, "_test_model_register", MagicMock(return_value=[{"op": "register_model"}])), \ + patch.object(app, "_test_delete_model", MagicMock(return_value=[{"op": "delete_model"}])), \ + patch.object(app, "_test_model_deployment", MagicMock(return_value=[{"op": "create_md"}, {"op": "delete_md"}])), \ + patch.object(app, "_test_manage_job", MagicMock(return_value=[{"op": "create_job"}, {"op": "create_job_run"}, {"op": "delete_job"}])): + yield app + + +def test_common_policies(app): + with patch.object(app, "_execute") as mock_execute: + mock_execute.return_value = ("mock_return_value", MagicMock(status=PolicyStatus.SUCCESS, to_dict=lambda: {"op": "operation_success"})) + app.common_policies() + + assert mock_execute.call_count == 9 + mock_execute.assert_any_call(app._util.list_compartments) + mock_execute.assert_any_call(app._util.list_models) + mock_execute.assert_any_call(app._util.list_model_version_sets) + mock_execute.assert_any_call(app._util.list_project) + mock_execute.assert_any_call(app._util.list_jobs) + mock_execute.assert_any_call(app._util.list_job_runs) + mock_execute.assert_any_call(app._util.list_buckets) + mock_execute.assert_any_call(app._util.list_log_groups) + mock_execute.assert_any_call(app._util.get_resource_availability, limit_name="ds-gpu-a10-count") + +def test_model_register(app): + with patch_common_app_methods(app) as mocks: + result = app.model_register() + + # Assertions + assert isinstance(result, list) + assert len(result) == 2 + assert result[0]["op"] == "register_model" + assert result[1]["op"] == "delete_model" + + mocks._consent.assert_called_once() + mocks._prompt.assert_called_once() + mocks._test_model_register.assert_called_once_with(bucket="mock-bucket") + mocks._test_delete_model.assert_called_once() + + +def test_model_deployment(app): + with patch_common_app_methods(app) as mocks: + + # Run method + result = app.model_deployment() + + # Assertions + assert len(result) == 4 + assert result[0]["op"] == "register_model" + assert result[1]["op"] == "create_md" + assert result[2]["op"] == "delete_md" + assert result[3]["op"] == "delete_model" + + mocks._consent.assert_called_once() + mocks._prompt.assert_called_once() + mocks._test_model_register.assert_called_once_with(bucket="mock-bucket") + mocks._test_model_deployment.assert_called_once() + mocks._test_delete_model.assert_called_once() + +def test_evaluation(app): + with patch_common_app_methods(app) as mocks: + + result = app.evaluation() + + # Assertions + assert len(result) == 7 + assert result[0]["op"] == "create_mvs" + assert result[1]["op"] == "delete_mvs" + assert result[2]["op"] == "register_model" + assert result[3]["op"] == "delete_model" + assert result[4]["op"] == "create_job" + assert result[5]["op"] == "create_job_run" + assert result[6]["op"] == "delete_job" + + mocks._consent.assert_called_once() + mocks._prompt.assert_called_once() + mocks._test_manage_mvs.assert_called_once() + mocks._test_model_register.assert_called_once_with(bucket="mock-bucket") + mocks._test_delete_model.assert_called_once() + mocks._test_manage_job.assert_called_once() + +def test_finetune(app): + with patch.object(app, "_execute") as mock_execute, \ + patch_common_app_methods(app) as mocks: + + # Mock manage_bucket execution + mock_execute.return_value = (None, MagicMock( + status=PolicyStatus.SUCCESS, + to_dict=lambda: {"op": "manage_bucket"} + )) + + # Call method + result = app.finetune() + + # Assertions + assert len(result) == 6 + assert result[0]["op"] == "create_mvs" + assert result[1]["op"] == "delete_mvs" + assert result[2]["op"] == "create_job" + assert result[3]["op"] == "create_job_run" + assert result[4]["op"] == "delete_job" + assert result[5]["op"] == "manage_bucket" + + mocks._consent.assert_called_once() + assert mocks._prompt.call_count == 3 + mocks._execute.assert_called_once_with(app._util.manage_bucket, bucket="mock-bucket") + mocks._test_manage_mvs.assert_called_once() + mocks._test_manage_job.assert_called_once() \ No newline at end of file diff --git a/tests/unitary/with_extras/aqua/test_verify_policies_utils.py b/tests/unitary/with_extras/aqua/test_verify_policies_utils.py new file mode 100644 index 000000000..db4bdcc6b --- /dev/null +++ b/tests/unitary/with_extras/aqua/test_verify_policies_utils.py @@ -0,0 +1,189 @@ +import pytest +from unittest.mock import patch, MagicMock +from ads.aqua.verify_policies.utils import VerifyPoliciesUtils +from ads.config import COMPARTMENT_OCID, PROJECT_OCID, TENANCY_OCID, DATA_SCIENCE_SERVICE_NAME + +@pytest.fixture +def utils(): + return VerifyPoliciesUtils() + +def test_list_compartments(utils): + mock_compartments = [MagicMock(name="Compartment1"), MagicMock(name="Compartment2")] + with patch.object(utils.aqua_model.identity_client, 'list_compartments') as mock_list: + mock_list.return_value.data = mock_compartments + result = utils.list_compartments() + mock_list.assert_called_once_with(compartment_id=TENANCY_OCID, limit=3) + assert result == mock_compartments + +def test_list_models(utils): + mock_models = [MagicMock(name="Model1"), MagicMock(name="Model2")] + with patch.object(utils.aqua_model, 'list') as mock_list: + mock_list.return_value = mock_models + result = utils.list_models(display_name="TestModel") + mock_list.assert_called_once_with(display_name="TestModel") + assert result == mock_models + +def test_list_log_groups(utils): + mock_groups = [MagicMock(), MagicMock()] + with patch.object(utils.aqua_model.logging_client, 'list_log_groups') as mock_list: + mock_list.return_value.data = mock_groups + result = utils.list_log_groups() + mock_list.assert_called_once_with(compartment_id=COMPARTMENT_OCID, limit=3) + assert result == mock_groups + +def test_list_log(utils): + mock_logs = [MagicMock(), MagicMock()] + with patch.object(utils.aqua_model.logging_client, 'list_logs') as mock_list: + mock_list.return_value.data = mock_logs + result = utils.list_log(log_group_id="dummy") + mock_list.assert_called_once_with(log_group_id="dummy", limit=3) + assert result == mock_logs + +def test_list_project(utils): + mock_projects = [MagicMock(), MagicMock()] + with patch.object(utils.aqua_model.ds_client, 'list_projects') as mock_list: + mock_list.return_value.data = mock_projects + result = utils.list_project() + mock_list.assert_called_once_with(compartment_id=COMPARTMENT_OCID, limit=3) + assert result == mock_projects + +def test_list_model_version_sets(utils): + mock_sets = [MagicMock(), MagicMock()] + with patch.object(utils.aqua_model.ds_client, 'list_model_version_sets') as mock_list: + mock_list.return_value.data = mock_sets + result = utils.list_model_version_sets() + mock_list.assert_called_once_with(compartment_id=COMPARTMENT_OCID, limit=3) + assert result == mock_sets + +def test_list_jobs(utils): + mock_jobs = [MagicMock(), MagicMock()] + with patch.object(utils.aqua_model.ds_client, 'list_jobs') as mock_list: + mock_list.return_value.data = mock_jobs + result = utils.list_jobs() + mock_list.assert_called_once_with(compartment_id=COMPARTMENT_OCID, limit=3) + assert result == mock_jobs + +def test_list_job_runs(utils): + mock_job_runs = [MagicMock(), MagicMock()] + with patch.object(utils.aqua_model.ds_client, 'list_job_runs') as mock_list: + mock_list.return_value.data = mock_job_runs + result = utils.list_job_runs() + mock_list.assert_called_once_with(compartment_id=COMPARTMENT_OCID, limit=3) + assert result == mock_job_runs + +def test_list_buckets(utils): + mock_buckets = [MagicMock(), MagicMock()] + with patch.object(utils.obs_client, 'get_namespace') as mock_namespace, \ + patch.object(utils.obs_client, 'list_buckets') as mock_list: + mock_namespace.return_value.data = "namespace" + mock_list.return_value.data = mock_buckets + result = utils.list_buckets() + mock_list.assert_called_once_with(namespace_name="namespace", compartment_id=COMPARTMENT_OCID, limit=3) + assert result == mock_buckets + +def test_manage_bucket(utils): + with patch.object(utils.obs_client, 'get_namespace') as mock_namespace, \ + patch.object(utils.obs_client, 'put_object') as mock_put, \ + patch.object(utils.obs_client, 'delete_object') as mock_delete: + mock_namespace.return_value.data = "namespace" + result = utils.manage_bucket(bucket="test-bucket") + mock_put.assert_called_once() + mock_delete.assert_called_once() + assert result is True + +def test_list_model_deployment_shapes(utils): + mock_shapes = [MagicMock(), MagicMock()] + with patch.object(utils.aqua_model.ds_client, 'list_model_deployment_shapes') as mock_list: + mock_list.return_value.data = mock_shapes + result = utils.list_model_deployment_shapes() + mock_list.assert_called_once_with(compartment_id=COMPARTMENT_OCID, limit=3) + assert result == mock_shapes + +def test_get_resource_availability(utils): + mock_availability = MagicMock() + with patch('ads.aqua.verify_policies.utils.oci_client.OCIClientFactory') as mock_factory: + mock_instance = mock_factory.return_value + mock_instance.limits.get_resource_availability.return_value.data = mock_availability + result = utils.get_resource_availability(limit_name="test_limit") + mock_instance.limits.get_resource_availability.assert_called_once_with( + compartment_id=COMPARTMENT_OCID, + service_name=DATA_SCIENCE_SERVICE_NAME, + limit_name="test_limit" + ) + assert result == mock_availability + + +def test_register_model(utils): + with patch.object(utils.aqua_model.ds_client, 'create_model') as mock_create_model, \ + patch.object(utils.aqua_model.ds_client, 'create_model_artifact') as mock_create_artifact: + + mock_model_response = MagicMock() + mock_model_response.id = "mock_model_id" + mock_create_model.return_value.data = mock_model_response + + result = utils.register_model() + + mock_create_model.assert_called_once() + mock_create_artifact.assert_called_once_with(model_id="mock_model_id", model_artifact=b"7IV6cktUGcHIhur4bXTv") + assert result == "mock_model_id" + +def test_create_model_deployment(utils): + with patch.object(utils.aqua_model.ds_client, 'create_model_deployment') as mock_create_md, \ + patch.object(utils.aqua_model.ds_client, 'get_model_deployment') as mock_get_md, \ + patch('oci.wait_until') as mock_wait: + + mock_md_response = MagicMock() + mock_md_response.data.id = "mock_deployment_id" + mock_create_md.return_value = mock_md_response + + mock_get_md.return_value = MagicMock(data=MagicMock(lifecycle_state="SUCCEEDED")) + mock_wait.return_value = MagicMock(data=MagicMock(lifecycle_state="SUCCEEDED")) + + result = utils.create_model_deployment(model_id="mock_model_id", instance_shape="VM.Standard2.1") + + assert result == "mock_deployment_id" + mock_create_md.assert_called_once() + +def test_create_job(utils): + with patch.object(utils.aqua_model.ds_client, 'create_job') as mock_create_job, \ + patch.object(utils.aqua_model.ds_client, 'create_job_artifact') as mock_artifact: + + mock_job_response = MagicMock() + mock_job_response.data.id = "mock_job_id" + mock_create_job.return_value = mock_job_response + + result = utils.create_job(display_name="Test Job", subnet_id="subnet123") + + assert result == "mock_job_id" + mock_create_job.assert_called_once() + mock_artifact.assert_called_once_with( + job_id="mock_job_id", job_artifact=b"echo OK\n", content_disposition="attachment; filename=entry.sh" + ) + +def test_create_job_run(utils): + with patch.object(utils.aqua_model.ds_client, 'create_job_run') as mock_create_run, \ + patch.object(utils.aqua_model.ds_client, 'get_job_run') as mock_get_run, \ + patch('oci.wait_until') as mock_wait: + + mock_run_response = MagicMock() + mock_run_response.data.id = "mock_job_run_id" + mock_create_run.return_value = mock_run_response + mock_get_run.return_value = MagicMock(data=MagicMock(lifecycle_state="SUCCEEDED")) + mock_wait.return_value = MagicMock(data=MagicMock(lifecycle_state="SUCCEEDED")) + + result = utils.create_job_run(job_id="mock_job_id", display_name="Test Run") + + assert result.data.lifecycle_state == "SUCCEEDED" + mock_create_run.assert_called_once() + +def test_create_model_version_set(utils): + with patch.object(utils.aqua_model, 'create_model_version_set') as mock_create_mvs: + mock_response = MagicMock() + mock_create_mvs.return_value = mock_response + result = utils.create_model_version_set(name="TestMVS") + mock_create_mvs.assert_called_once_with( + model_version_set_name="TestMVS", + compartment_id=COMPARTMENT_OCID, + project_id=PROJECT_OCID + ) + assert result == mock_response \ No newline at end of file From 201a3c4e128f5586b024657c8e1720544a0c4712 Mon Sep 17 00:00:00 2001 From: Amit Prajapati Date: Thu, 3 Jul 2025 22:50:35 +0530 Subject: [PATCH 3/4] fix for create mvs --- ads/aqua/verify_policies/verify.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ads/aqua/verify_policies/verify.py b/ads/aqua/verify_policies/verify.py index 68a14878e..7aa3e5d44 100644 --- a/ads/aqua/verify_policies/verify.py +++ b/ads/aqua/verify_policies/verify.py @@ -166,8 +166,7 @@ def _test_manage_mvs(self, **kwargs): """ logger.info(f"Creating Model Version set with name {TEST_MVS_NAME}") - model_mvs, test_create_mvs = self._execute(self._util.aqua_model.create_model_version_set, - name=TEST_MVS_NAME) + model_mvs, test_create_mvs = self._execute(self._util.create_model_version_set, name=TEST_MVS_NAME) model_mvs_id = model_mvs[0] if model_mvs_id: logger.info(f"Deleting Model Version set {TEST_MVS_NAME}") From 979e52b7d6ef9b2684c9330e3d32a2dd245a78af Mon Sep 17 00:00:00 2001 From: Amit Prajapati Date: Fri, 4 Jul 2025 14:18:18 +0530 Subject: [PATCH 4/4] Add subnet --- ads/aqua/verify_policies/utils.py | 3 ++- ads/aqua/verify_policies/verify.py | 17 +++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/ads/aqua/verify_policies/utils.py b/ads/aqua/verify_policies/utils.py index f280ecb8f..f42706149 100644 --- a/ads/aqua/verify_policies/utils.py +++ b/ads/aqua/verify_policies/utils.py @@ -164,6 +164,7 @@ def create_job(self, **kwargs): shape_name = kwargs.pop("shape_name", TEST_DEFAULT_JOB_SHAPE) display_name = kwargs.pop("display_name") subnet_id = kwargs.pop("subnet_id", None) + job_infrastructure_type = "STANDALONE" if subnet_id is not None else "ME_STANDALONE" response = self.aqua_model.ds_client.create_job( create_job_details=oci.data_science.models.CreateJobDetails( @@ -174,7 +175,7 @@ def create_job(self, **kwargs): job_type="DEFAULT", environment_variables={}), job_infrastructure_configuration_details=oci.data_science.models.StandaloneJobInfrastructureConfigurationDetails( - job_infrastructure_type="ME_STANDALONE", + job_infrastructure_type=job_infrastructure_type, shape_name=shape_name, subnet_id=subnet_id, job_shape_config_details=oci.data_science.models.JobShapeConfigDetails( diff --git a/ads/aqua/verify_policies/verify.py b/ads/aqua/verify_policies/verify.py index 7aa3e5d44..76bf8e2f2 100644 --- a/ads/aqua/verify_policies/verify.py +++ b/ads/aqua/verify_policies/verify.py @@ -164,12 +164,12 @@ def _test_manage_mvs(self, **kwargs): Returns: List of result dicts for MVS creation and deletion. """ - logger.info(f"Creating Model Version set with name {TEST_MVS_NAME}") + logger.info(f"Creating ModelVersionSet with name {TEST_MVS_NAME}") model_mvs, test_create_mvs = self._execute(self._util.create_model_version_set, name=TEST_MVS_NAME) model_mvs_id = model_mvs[0] if model_mvs_id: - logger.info(f"Deleting Model Version set {TEST_MVS_NAME}") + logger.info(f"Deleting ModelVersionSet {TEST_MVS_NAME}") _, delete_mvs = self._execute(self._util.aqua_model.ds_client.delete_model_version_set, model_version_set_id=model_mvs_id) else: @@ -184,15 +184,20 @@ def _test_manage_job(self, **kwargs): List of result dicts for job creation, job run creation, and job deletion. """ + logger.info(f"Creating Job with name {TEST_JOB_NAME}") + # Create Job & JobRun. - evaluation_job_id, test_create_job = self._execute(self._util.create_job, display_name=TEST_JOB_NAME, + job_id, test_create_job = self._execute(self._util.create_job, display_name=TEST_JOB_NAME, **kwargs) + + logger.info(f"Creating JobRun with name {TEST_JOB_RUN_NAME}") + _, test_create_job_run = self._execute(self._util.create_job_run, display_name=TEST_JOB_RUN_NAME, - job_id=evaluation_job_id, **kwargs) + job_id=job_id, **kwargs) # Delete Job Run - if evaluation_job_id: - _, delete_job = self._execute(self._util.aqua_model.ds_client.delete_job, job_id=evaluation_job_id) + if job_id: + _, delete_job = self._execute(self._util.aqua_model.ds_client.delete_job, job_id=job_id, delete_related_job_runs=True) else: delete_job = self._get_operation_result(self._util.aqua_model.ds_client.delete_job, PolicyStatus.UNVERIFIED)