Skip to content

Convert get database operation to pipeline architecture #286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion sdk/core/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ pub struct ClientOptions {
// TODO: Expose retry options and transport overrides.
pub per_call_policies: Vec<Arc<dyn Policy>>,
pub per_retry_policies: Vec<Arc<dyn Policy>>,

pub telemetry: TelemetryOptions,
}
24 changes: 20 additions & 4 deletions sdk/core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ impl ResponseBuilder {
pub struct Response {
status: StatusCode,
headers: HeaderMap,
response: PinnedStream,
body: PinnedStream,
}

impl Response {
fn new(status: StatusCode, headers: HeaderMap, response: PinnedStream) -> Self {
fn new(status: StatusCode, headers: HeaderMap, body: PinnedStream) -> Self {
Self {
status,
headers,
response,
body,
}
}

Expand All @@ -58,7 +58,23 @@ impl Response {
}

pub fn deconstruct(self) -> (StatusCode, HeaderMap, PinnedStream) {
(self.status, self.headers, self.response)
(self.status, self.headers, self.body)
}

pub async fn validate(self, expected_status: StatusCode) -> Result<Self, crate::HttpError> {
let status = self.status();
if expected_status != status {
let body = collect_pinned_stream(self.body)
.await
.unwrap_or_else(|_| Bytes::from_static("<INVALID BODY>".as_bytes()));
Err(crate::HttpError::new_unexpected_status_code(
expected_status,
status,
std::str::from_utf8(&body as &[u8]).unwrap_or("<NON-UTF8 BODY>"),
))
} else {
Ok(self)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/examples/collection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::prelude::*;
use azure_cosmos::prelude::*;
use std::error::Error;

Expand Down Expand Up @@ -43,8 +44,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let db = client
.clone()
.into_database_client(db.id.clone())
.get_database()
.execute()
.get_database(Context::new(), GetDatabaseOptions::default())
.await?;
println!("db {} found == {:?}", &db.database.id, &db);
}
Expand Down
15 changes: 2 additions & 13 deletions sdk/cosmos/examples/create_delete_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use azure_cosmos::prelude::*;
use futures::stream::StreamExt;

use std::error::Error;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand Down Expand Up @@ -32,11 +31,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
// authorization token at later time if you need, for example, to escalate the privileges for a
// single operation.
let http_client = azure_core::new_http_client();
let client = CosmosClient::new(
http_client.clone(),
account.clone(),
authorization_token.clone(),
);
let client = CosmosClient::new(http_client, account, authorization_token);

// The Cosmos' client exposes a lot of methods. This one lists the databases in the specified
// account. Database do not implement Display but deref to &str so you can pass it to methods
Expand All @@ -45,13 +40,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let list_databases_response = client.list_databases().execute().await?;
println!("list_databases_response = {:#?}", list_databases_response);

let cosmos_client = CosmosClient::with_pipeline(
http_client,
account,
authorization_token,
CosmosOptions::with_client(Arc::new(reqwest::Client::new())),
);
let db = cosmos_client
let db = client
.create_database(
azure_core::Context::new(),
&database_name,
Expand Down
9 changes: 1 addition & 8 deletions sdk/cosmos/examples/document_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use azure_cosmos::prelude::*;
use azure_cosmos::responses::GetDocumentResponse;
use std::borrow::Cow;
use std::error::Error;
use std::sync::Arc;

#[derive(Clone, Serialize, Deserialize, Debug)]
struct MySampleStruct<'a> {
Expand Down Expand Up @@ -67,17 +66,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.into_iter()
.find(|db| db.id == DATABASE);

let database_client = CosmosClient::with_pipeline(
http_client,
account,
authorization_token,
CosmosOptions::with_client(Arc::new(reqwest::Client::new())),
);
// If the requested database is not found we create it.
let database = match db {
Some(db) => db,
None => {
database_client
client
.create_database(Context::new(), DATABASE, CreateDatabaseOptions::new())
.await?
.database
Expand Down
5 changes: 4 additions & 1 deletion sdk/cosmos/examples/get_database.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::prelude::*;
use azure_cosmos::prelude::*;
use std::error::Error;

Expand All @@ -20,7 +21,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {

let database_client = client.into_database_client(database_name.clone());

let response = database_client.get_database().execute().await?;
let response = database_client
.get_database(Context::new(), GetDatabaseOptions::new())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For client and method options, do we want to standardize on new or default? See my comment above for context. In other SDKs, this argument is typically optional e.g. default parameter value in .NET, overload in Java, kwargs in Python, and options bag in JS, hence I was thinking default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some options won't be able to have default implementations because they have required arguments. If we opt for Default, we won't be able to do that for all option types.

Unfortunately since Rust doesn't have default params, this is fairly verbose and Default impls don't help necessarily. We could have one method get_database that only takes a context and default the options, and a get_database_with_options which takes an options param.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, if a parameter is required (hence making it required to create options) it's its own parameter. Eventually we'll want to be consistent with the other SDKs when not idiomatic e.g. which parameters are parameters and which are in some sort of options bag.

.await?;
println!("response == {:?}", response);

Ok(())
Expand Down
5 changes: 4 additions & 1 deletion sdk/cosmos/examples/permission_00.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use azure_core::prelude::*;
use azure_cosmos::prelude::*;
use std::error::Error;

Expand Down Expand Up @@ -36,7 +37,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.into_collection_client(collection_name2);
let user_client = database_client.clone().into_user_client(user_name);

let get_database_response = database_client.get_database().execute().await?;
let get_database_response = database_client
.get_database(Context::new(), GetDatabaseOptions::new())
.await?;
println!("get_database_response == {:#?}", get_database_response);

let get_collection_response = collection_client.get_collection().execute().await?;
Expand Down
30 changes: 12 additions & 18 deletions sdk/cosmos/src/clients/cosmos_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct CosmosClient {
auth_token: AuthorizationToken,
cloud_location: CloudLocation,
}

/// Options for specifying how a Cosmos client will behave
pub struct CosmosOptions {
options: ClientOptions,
Expand Down Expand Up @@ -142,22 +143,6 @@ impl CosmosClient {
}
}

/// Construct a pipeline with explicit options
pub fn with_pipeline(
http_client: Arc<dyn HttpClient>,
account: String, // TODO: this will eventually be a URL
auth_token: AuthorizationToken,
options: CosmosOptions,
) -> Self {
let pipeline = new_pipeline_from_options(options);
Self {
http_client,
pipeline,
auth_token,
cloud_location: CloudLocation::Public(account),
}
}

/// Set the auth token used
pub fn auth_token(&mut self, auth_token: AuthorizationToken) {
self.auth_token = auth_token;
Expand All @@ -177,7 +162,9 @@ impl CosmosClient {
.pipeline()
.send(&mut ctx, &mut request)
.await
.map_err(crate::Error::PolicyError)?;
.map_err(crate::Error::PolicyError)?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateDatabaseResponse::try_from(response).await?)
}
Expand All @@ -196,6 +183,11 @@ impl CosmosClient {
DatabaseClient::new(self, database_name)
}

/// Prepares an `http::RequestBuilder`.
///
/// TODO: Remove once all operations have been moved to pipeline architecture. This is used by
/// legacy operations that have not moved to the use of the pipeline architecture. Once
/// that is complete, this will be superceded by `prepare_request2`.
pub(crate) fn prepare_request(
&self,
uri_path: &str,
Expand All @@ -217,7 +209,9 @@ impl CosmosClient {
self.prepare_request_with_signature(uri_path, http_method, &time, &auth)
}

// Eventually this method will replace `prepare_request` fully
/// Prepares' an `azure_core::Request`.
///
/// Note: Eventually this method will replace `prepare_request` fully
pub(crate) fn prepare_request2(
&self,
uri_path: &str,
Expand Down
39 changes: 28 additions & 11 deletions sdk/cosmos/src/clients/database_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::*;
use crate::operations::*;
use crate::requests;
use crate::resources::ResourceType;
use crate::ReadonlyString;
use crate::{requests, ReadonlyString};

use azure_core::pipeline::Pipeline;
use azure_core::{Context, HttpClient, Request};
use azure_core::{Context, HttpClient};

/// A client for Cosmos database resources.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -35,8 +35,26 @@ impl DatabaseClient {
}

/// Get the database
pub fn get_database(&self) -> requests::GetDatabaseBuilder<'_, '_> {
requests::GetDatabaseBuilder::new(self)
pub async fn get_database(
&self,
mut ctx: Context,
options: GetDatabaseOptions,
) -> Result<GetDatabaseResponse, crate::Error> {
let mut request = self
.prepare_request_with_database_name(http::Method::GET)
.body(bytes::Bytes::new())
.unwrap()
.into();
options.decorate_request(&mut request)?;
let response = self
.pipeline()
.send(&mut ctx, &mut request)
.await
.map_err(crate::Error::PolicyError)?
.validate(http::StatusCode::OK)
.await?;
Comment on lines +43 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something the pipeline should actually do, and the types of errors, which can be retried, etc. is typically handled by the pipeline. Again, we can do this in a separate PR, but I bring it up as a design point. Having every single method repeat this is burdensome and exactly what the pipeline was meant to handle. For an example reference, see .NETs: https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/core/Azure.Core/src/Pipeline/HttpPipeline.cs. Our other language SDKs use something similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the validation? This something that depends on the specific operation being performed, so I don't think we can move this into the pipeline. I agree it's currently fairly verbose, but I'm not sure the right way to solve that is moving it into pipelines rather than moving some of this into a function somewhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of our SDKs use a response classifier they can pass in when building the pipeline. We don't necessarily have to be the same, but - even if validate is kept separate here - serializing and sending the request, and deserializing and handling the response should be done in the pipeline as much as possible. We do have some clients (Key Vault does it in a couple places) that will occasionally "override" that behavior, but it's pretty rare we need to.


Ok(GetDatabaseResponse::try_from(response).await?)
}

/// List collections in the database
Expand All @@ -52,24 +70,23 @@ impl DatabaseClient {
/// Create a collection
pub async fn create_collection<S: AsRef<str>>(
&self,
ctx: Context,
mut ctx: Context,
collection_name: S,
options: CreateCollectionOptions,
) -> Result<CreateCollectionResponse, crate::Error> {
let request = self.cosmos_client().prepare_request(
let mut request = self.cosmos_client().prepare_request2(
&format!("dbs/{}/colls", self.database_name()),
http::Method::POST,
ResourceType::Collections,
);
let mut request: Request = request.body(bytes::Bytes::new()).unwrap().into();

let mut ctx = ctx.clone();
options.decorate_request(&mut request, collection_name.as_ref())?;
let response = self
.pipeline()
.send(&mut ctx, &mut request)
.await
.map_err(crate::Error::PolicyError)?;
.map_err(crate::Error::PolicyError)?
.validate(http::StatusCode::CREATED)
.await?;

Ok(CreateCollectionResponse::try_from(response).await?)
}
Expand Down
1 change: 0 additions & 1 deletion sdk/cosmos/src/consistency_level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ macro_rules! implement_from {
}

implement_from!(CreateSlugAttachmentResponse);
implement_from!(GetDatabaseResponse);
implement_from!(GetCollectionResponse);
implement_from!(CreateUserResponse);
implement_from!(DeleteAttachmentResponse);
Expand Down
2 changes: 0 additions & 2 deletions sdk/cosmos/src/operations/create_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ impl CreateCollectionOptions {
indexing_policy: IndexingPolicy => Some(indexing_policy),
offer: Offer => Some(offer),
}
}

impl CreateCollectionOptions {
pub(crate) fn decorate_request(
&self,
request: &mut HttpRequest,
Expand Down
67 changes: 67 additions & 0 deletions sdk/cosmos/src/operations/get_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::headers::from_headers::*;
use crate::prelude::*;
use crate::ResourceQuota;

use azure_core::headers::{etag_from_headers, session_token_from_headers};
use azure_core::{collect_pinned_stream, Request as HttpRequest, Response as HttpResponse};
use chrono::{DateTime, Utc};

#[derive(Debug, Clone, Default)]
pub struct GetDatabaseOptions {
consistency_level: Option<ConsistencyLevel>,
}

impl GetDatabaseOptions {
pub fn new() -> Self {
Self {
consistency_level: None,
}
}

setters! {
consistency_level: ConsistencyLevel => Some(consistency_level),
}

pub(crate) fn decorate_request(&self, request: &mut HttpRequest) -> Result<(), crate::Error> {
azure_core::headers::add_optional_header2(&self.consistency_level, request);
request.set_body(bytes::Bytes::from_static(&[]).into());

Ok(())
}
}

#[derive(Debug, Clone)]
pub struct GetDatabaseResponse {
pub database: Database,
pub charge: f64,
pub activity_id: uuid::Uuid,
pub session_token: String,
pub etag: String,
pub last_state_change: DateTime<Utc>,
pub resource_quota: Vec<ResourceQuota>,
pub resource_usage: Vec<ResourceQuota>,
pub schema_version: String,
pub service_version: String,
pub gateway_version: String,
}

impl GetDatabaseResponse {
pub async fn try_from(response: HttpResponse) -> Result<Self, crate::Error> {
let (_status_code, headers, pinned_stream) = response.deconstruct();
let body = collect_pinned_stream(pinned_stream).await?;

Ok(Self {
database: serde_json::from_slice(&body)?,
charge: request_charge_from_headers(&headers)?,
activity_id: activity_id_from_headers(&headers)?,
session_token: session_token_from_headers(&headers)?,
etag: etag_from_headers(&headers)?,
last_state_change: last_state_change_from_headers(&headers)?,
resource_quota: resource_quota_from_headers(&headers)?,
resource_usage: resource_usage_from_headers(&headers)?,
schema_version: schema_version_from_headers(&headers)?.to_owned(),
service_version: service_version_from_headers(&headers)?.to_owned(),
gateway_version: gateway_version_from_headers(&headers)?.to_owned(),
})
}
}
2 changes: 2 additions & 0 deletions sdk/cosmos/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

mod create_collection;
mod create_database;
mod get_database;

pub use create_collection::*;
pub use create_database::*;
pub use get_database::*;
Loading