Skip to content

Commit af106a7

Browse files
committed
feat: update scan plan instrumentation. Add o11y integ test. Add Jaeger to integ test docker stack
1 parent 0a8872f commit af106a7

File tree

18 files changed

+913
-75
lines changed

18 files changed

+913
-75
lines changed

Cargo.lock

Lines changed: 294 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/expr/visitors/expression_evaluator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl ExpressionEvaluator {
4343
/// the provided [`DataFile`]'s partition [`Struct`]. Used by [`TableScan`]
4444
/// to see if this [`DataFile`] could possibly contain data that matches
4545
/// the scan's filter.
46-
#[tracing::instrument(skip_all)]
46+
#[tracing::instrument(skip_all, level = "trace")]
4747
pub(crate) fn eval(&self, data_file: &DataFile) -> Result<bool> {
4848
let mut visitor = ExpressionEvaluatorVisitor::new(data_file.partition());
4949

crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl<'a> InclusiveMetricsEvaluator<'a> {
3939
/// provided [`DataFile`]'s metrics. Used by [`TableScan`] to
4040
/// see if this `DataFile` contains data that could match
4141
/// the scan's filter.
42-
#[tracing::instrument(skip_all)]
42+
#[tracing::instrument(skip_all, level = "trace")]
4343
pub(crate) fn eval(
4444
filter: &'a BoundPredicate,
4545
data_file: &'a DataFile,

crates/iceberg/src/io/object_cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl ObjectCache {
8585

8686
/// Retrieves an Arc [`Manifest`] from the cache
8787
/// or retrieves one from FileIO and parses it if not present
88-
#[tracing::instrument(skip_all)]
88+
#[tracing::instrument(skip_all, level = "debug")]
8989
pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
9090
if self.cache_disabled {
9191
return manifest_file

crates/iceberg/src/scan/context.rs

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919

2020
use futures::StreamExt;
2121
use futures::stream::BoxStream;
22+
use tracing::field::Empty;
2223

2324
use crate::delete_file_index::DeleteFileIndex;
2425
use crate::expr::{Bind, BoundPredicate, Predicate};
@@ -53,14 +54,26 @@ pub(crate) struct ManifestEntryContext {
5354
pub bound_predicates: Option<Arc<BoundPredicates>>,
5455
pub partition_spec_id: i32,
5556
pub snapshot_schema: SchemaRef,
57+
pub(crate) span: tracing::Span,
5658
}
5759

5860
impl ManifestFileContext {
5961
/// Consumes this [`ManifestFileContext`], fetching its Manifest from FileIO and then
6062
/// streaming its constituent [`ManifestEntries`]
6163
pub(crate) async fn fetch_manifest_and_stream_entries(
6264
self,
65+
parent_span: tracing::Span,
6366
) -> Result<BoxStream<'static, Result<ManifestEntryContext>>> {
67+
let manifest_span = tracing::debug_span!(
68+
parent: parent_span,
69+
"iceberg.scan.plan.process_manifest",
70+
iceberg.scan.plan.manifest.file_path = self.manifest_file.manifest_path,
71+
iceberg.scan.plan.manifest.entries_count = Empty,
72+
);
73+
74+
let span = manifest_span.clone();
75+
let _guard = manifest_span.enter();
76+
6477
let ManifestFileContext {
6578
object_cache,
6679
manifest_file,
@@ -73,15 +86,30 @@ impl ManifestFileContext {
7386

7487
let manifest = object_cache.get_manifest(&manifest_file).await?;
7588

89+
span.record(
90+
"iceberg.scan.plan.manifest.entries_count",
91+
manifest.entries().len(),
92+
);
93+
7694
Ok(async_stream::stream! {
7795
for manifest_entry in manifest.entries() {
96+
let manifest_entry_span = tracing::debug_span!(
97+
parent: span.clone(),
98+
"iceberg.scan.plan.process_data_file",
99+
iceberg.scam.plan.data_file.file_path = manifest_entry.file_path(),
100+
"iceberg.scan.plan_data_file.type" = Empty,
101+
iceberg.scan.plan.data_file.skipped = Empty,
102+
iceberg.scan.plan.data_file.skipped_reason = Empty,
103+
);
104+
78105
yield Ok(ManifestEntryContext {
79106
manifest_entry: manifest_entry.clone(),
80107
expression_evaluator_cache: expression_evaluator_cache.clone(),
81108
field_ids: field_ids.clone(),
82109
partition_spec_id: manifest_file.partition_spec_id,
83110
bound_predicates: bound_predicates.clone(),
84111
snapshot_schema: snapshot_schema.clone(),
112+
span: manifest_entry_span,
85113
});
86114
}
87115
}
@@ -144,7 +172,11 @@ pub(crate) struct PlanContext {
144172
}
145173

146174
impl PlanContext {
147-
#[tracing::instrument(skip_all)]
175+
#[tracing::instrument(
176+
skip_all,
177+
level = "debug",
178+
fields(iceberg.scan.plan.manifest_list.file_path = ?self.snapshot.manifest_list()),
179+
)]
148180
pub(crate) async fn get_manifest_list(&self) -> Result<Arc<ManifestList>> {
149181
self.object_cache
150182
.as_ref()
@@ -175,14 +207,19 @@ impl PlanContext {
175207

176208
#[tracing::instrument(
177209
skip_all,
210+
level = "debug",
211+
name = "iceberg.scan.plan.process_manifest_list",
178212
fields(
179-
manifest_list.len = manifest_list.entries().len(),
213+
iceberg.scan.plan.manifest_list.entries_count = manifest_list.entries().len(),
180214
)
181215
)]
182-
pub(crate) fn build_manifest_file_context_iter(
216+
pub(crate) fn build_manifest_file_contexts(
183217
&self,
184218
manifest_list: Arc<ManifestList>,
185-
) -> impl Iterator<Item = Result<ManifestFileContext>> {
219+
) -> (
220+
Vec<Result<ManifestFileContext>>,
221+
Vec<Result<ManifestFileContext>>,
222+
) {
186223
let has_predicate = self.predicate.is_some();
187224

188225
(0..manifest_list.entries().len())
@@ -198,24 +235,28 @@ impl PlanContext {
198235
.get(manifest_file.partition_spec_id, predicate.clone())
199236
.eval(&manifest_file)?
200237
{
201-
tracing::trace!(file_path = manifest_file.manifest_path, "iceberg.scan.manifest_file.skipped");
202-
metrics::counter!("iceberg.scan.manifest_file.skipped", "reason" => "partition").increment(1);
238+
tracing::debug!(
239+
iceberg.scan.plan.manifest.file_path = manifest_file.manifest_path,
240+
iceberg.scan.plan.manifest.skip_reason = "partition",
241+
"iceberg.scan.plan.manifest_file.skipped"
242+
);
243+
metrics::counter!("iceberg.scan.plan.manifest_file.skipped", "reason" => "partition").increment(1);
203244
return Ok(None); // Skip this file.
204245
}
205246
Some(predicate)
206247
} else {
207248
None
208249
};
209250

210-
tracing::trace!(file_path = manifest_file.manifest_path, "iceberg.scan.manifest_file.included");
211-
metrics::counter!("iceberg.scan.manifest_file.included").increment(1);
251+
metrics::counter!("iceberg.scan.plan.manifest_file.included").increment(1);
212252

213253
let context = self
214254
.create_manifest_file_context(manifest_file, partition_bound_predicate)?;
215255
Ok(Some(context))
216256
})()
217257
.transpose()
218258
})
259+
.partition(|ctx| ctx.as_ref().map_or(true, |ctx| ctx.is_delete()))
219260
}
220261

221262
fn create_manifest_file_context(

0 commit comments

Comments
 (0)