From 0e9e4c726adabd04599e11c4e074e2ac192bc957 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 12 Feb 2025 21:24:00 +0800 Subject: [PATCH 1/5] add position delete file writer --- crates/iceberg/src/writer/base_writer/mod.rs | 1 + .../position_delete_file_writer.rs | 245 ++++++++++++++++++ 2 files changed, 246 insertions(+) create mode 100644 crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37ab97eb6d..a5f2adc676 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,3 +19,4 @@ pub mod data_file_writer; pub mod equality_delete_writer; +pub mod position_delete_file_writer; diff --git a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs new file mode 100644 index 0000000000..0a44b2d637 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Position delete file writer. +use std::sync::Arc; + +use arrow_array::builder::{PrimitiveBuilder, StringBuilder}; +use arrow_array::types::Int64Type; +use arrow_array::RecordBatch; +use once_cell::sync::Lazy; + +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{DataContentType, DataFile, NestedField, PrimitiveType, Schema, Struct, Type}; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 2147483546, + "file_path", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::required( + 2147483545, + "pos", + Type::Primitive(PrimitiveType::Long), + )), + ]) + .build() + .unwrap() +}); + +/// Position delete input. +#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)] +pub struct PositionDeleteInput { + /// The path of the file. + pub path: String, + /// The offset of the position delete. + pub offsets: Vec, +} + +impl PositionDeleteInput { + /// Create a new `PositionDeleteInput`. + pub fn new(path: String, offsets: Vec) -> Self { + PositionDeleteInput { path, offsets } + } +} +/// Builder for `MemoryPositionDeleteWriter`. +#[derive(Clone)] +pub struct PositionDeleteWriterBuilder { + inner: B, + partition_value: Option, +} + +impl PositionDeleteWriterBuilder { + /// Create a new `MemoryPositionDeleteWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B, partition_value: Option) -> Self { + Self { + inner, + partition_value, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder> + for PositionDeleteWriterBuilder +{ + type R = PositionDeleteWriter; + + async fn build(self) -> Result { + Ok(PositionDeleteWriter { + inner_writer: Some(self.inner.build().await?), + partition_value: self.partition_value.unwrap_or(Struct::empty()), + }) + } +} + +/// Position delete writer. +pub struct PositionDeleteWriter { + inner_writer: Option, + partition_value: Struct, +} + +#[async_trait::async_trait] +impl IcebergWriter> for PositionDeleteWriter { + async fn write(&mut self, input: Vec) -> Result<()> { + let mut path_column_builder = StringBuilder::new(); + let mut offset_column_builder = PrimitiveBuilder::::new(); + for input in input.into_iter() { + for offset in input.offsets { + path_column_builder.append_value(&input.path); + offset_column_builder.append_value(offset); + } + } + let record_batch = RecordBatch::try_new( + Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap()), + vec![ + Arc::new(path_column_builder.finish()), + Arc::new(offset_column_builder.finish()), + ], + ) + .map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()))?; + + if let Some(inner_writer) = &mut self.inner_writer { + inner_writer.write(&record_batch).await?; + } else { + return Err(Error::new(ErrorKind::Unexpected, "write has been closed")); + } + Ok(()) + } + + async fn close(&mut self) -> Result> { + let writer = self.inner_writer.take().unwrap(); + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(DataContentType::PositionDeletes); + res.partition(self.partition_value.clone()); + res.build().expect("Guaranteed to be valid") + }) + .collect()) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{Int64Array, StringArray}; + use itertools::Itertools; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{DataContentType, DataFileFormat, Struct}; + use crate::writer::base_writer::position_delete_file_writer::{ + PositionDeleteInput, PositionDeleteWriterBuilder, POSITION_DELETE_SCHEMA, + }; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_position_delete_writer() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(POSITION_DELETE_SCHEMA.clone()), + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut position_delete_writer = PositionDeleteWriterBuilder::new(pw, None).build().await?; + + // Write some position delete inputs + let inputs: Vec = vec![ + PositionDeleteInput { + path: "file2.parquet".to_string(), + offsets: vec![2, 1, 3], + }, + PositionDeleteInput { + path: "file3.parquet".to_string(), + offsets: vec![2], + }, + PositionDeleteInput { + path: "file1.parquet".to_string(), + offsets: vec![5, 4, 1], + }, + ]; + let expect_inputs = inputs + .clone() + .into_iter() + .flat_map(|input| { + input + .offsets + .iter() + .map(|off| (input.path.clone(), *off)) + .collect::>() + }) + .collect_vec(); + position_delete_writer.write(inputs.clone()).await?; + + let data_files = position_delete_writer.close().await.unwrap(); + assert_eq!(data_files.len(), 1); + assert_eq!(data_files[0].file_format, DataFileFormat::Parquet); + assert_eq!(data_files[0].content, DataContentType::PositionDeletes); + assert_eq!(data_files[0].partition, Struct::empty()); + + let parquet_file = file_io + .new_input(&data_files[0].file_path)? + .read() + .await + .unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_file).unwrap(); + let reader = builder.build().unwrap(); + let batches = reader.map(|x| x.unwrap()).collect::>(); + + let path_column = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let offset_column = batches[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + for (i, (path, offset)) in expect_inputs.into_iter().enumerate() { + assert_eq!(path_column.value(i), path); + assert_eq!(offset_column.value(i), offset); + } + + Ok(()) + } +} From cc1e79c3018b966df85d883c09826a812180813e Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 13 Feb 2025 13:34:36 +0800 Subject: [PATCH 2/5] refine name --- .../src/writer/base_writer/position_delete_file_writer.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs index 0a44b2d637..997711deed 100644 --- a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs @@ -101,12 +101,12 @@ pub struct PositionDeleteWriter { #[async_trait::async_trait] impl IcebergWriter> for PositionDeleteWriter { - async fn write(&mut self, input: Vec) -> Result<()> { + async fn write(&mut self, inputs: Vec) -> Result<()> { let mut path_column_builder = StringBuilder::new(); let mut offset_column_builder = PrimitiveBuilder::::new(); - for input in input.into_iter() { - for offset in input.offsets { - path_column_builder.append_value(&input.path); + for pd_input in inputs.into_iter() { + for offset in pd_input.offsets { + path_column_builder.append_value(&pd_input.path); offset_column_builder.append_value(offset); } } From 256de49f8a4d23c507ca67797e7ccef1a5f15572 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sun, 23 Feb 2025 14:21:33 +0800 Subject: [PATCH 3/5] refine input of writer --- .../position_delete_file_writer.rs | 136 +++++++++++------- 1 file changed, 84 insertions(+), 52 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs index 997711deed..e956c9f3b4 100644 --- a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs @@ -16,6 +16,8 @@ // under the License. //! Position delete file writer. +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use arrow_array::builder::{PrimitiveBuilder, StringBuilder}; @@ -29,17 +31,21 @@ use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; +const POS_DELETE_FIELD1_NAME: &str = "file_path"; +const POS_DELETE_FIELD1_ID: i32 = 2147483546; +const POS_DELETE_FIELD2_NAME: &str = "pos"; +const POS_DELETE_FIELD2_ID: i32 = 2147483545; static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { Schema::builder() .with_fields(vec![ Arc::new(NestedField::required( - 2147483546, - "file_path", + POS_DELETE_FIELD1_ID, + POS_DELETE_FIELD1_NAME, Type::Primitive(PrimitiveType::String), )), Arc::new(NestedField::required( - 2147483545, - "pos", + POS_DELETE_FIELD2_ID, + POS_DELETE_FIELD2_NAME, Type::Primitive(PrimitiveType::Long), )), ]) @@ -49,17 +55,17 @@ static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { /// Position delete input. #[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)] -pub struct PositionDeleteInput { +pub struct PositionDeleteInput<'a> { /// The path of the file. - pub path: String, - /// The offset of the position delete. - pub offsets: Vec, + pub path: &'a str, + /// The row number in data file + pub pos: i64, } -impl PositionDeleteInput { +impl<'a> PositionDeleteInput<'a> { /// Create a new `PositionDeleteInput`. - pub fn new(path: String, offsets: Vec) -> Self { - PositionDeleteInput { path, offsets } + pub fn new(path: &'a str, row: i64) -> Self { + Self { path, pos: row } } } /// Builder for `MemoryPositionDeleteWriter`. @@ -80,7 +86,7 @@ impl PositionDeleteWriterBuilder { } #[async_trait::async_trait] -impl IcebergWriterBuilder> +impl<'a, B: FileWriterBuilder> IcebergWriterBuilder>> for PositionDeleteWriterBuilder { type R = PositionDeleteWriter; @@ -99,16 +105,22 @@ pub struct PositionDeleteWriter { partition_value: Struct, } -#[async_trait::async_trait] -impl IcebergWriter> for PositionDeleteWriter { - async fn write(&mut self, inputs: Vec) -> Result<()> { +impl<'a, B: FileWriterBuilder> IcebergWriter>> + for PositionDeleteWriter +{ + fn write<'life0, 'async_trait>( + &'life0 mut self, + input: Vec>, + ) -> Pin> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { let mut path_column_builder = StringBuilder::new(); let mut offset_column_builder = PrimitiveBuilder::::new(); - for pd_input in inputs.into_iter() { - for offset in pd_input.offsets { - path_column_builder.append_value(&pd_input.path); - offset_column_builder.append_value(offset); - } + for pd_input in input.into_iter() { + path_column_builder.append_value(pd_input.path); + offset_column_builder.append_value(pd_input.pos); } let record_batch = RecordBatch::try_new( Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap()), @@ -117,28 +129,38 @@ impl IcebergWriter> for PositionD Arc::new(offset_column_builder.finish()), ], ) - .map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()))?; + .map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string())); - if let Some(inner_writer) = &mut self.inner_writer { - inner_writer.write(&record_batch).await?; - } else { - return Err(Error::new(ErrorKind::Unexpected, "write has been closed")); - } - Ok(()) + Box::pin(async move { + if let Some(inner_writer) = &mut self.inner_writer { + inner_writer.write(&record_batch?).await?; + } else { + return Err(Error::new(ErrorKind::Unexpected, "write has been closed")); + } + Ok(()) + }) } - async fn close(&mut self) -> Result> { - let writer = self.inner_writer.take().unwrap(); - Ok(writer - .close() - .await? - .into_iter() - .map(|mut res| { - res.content(DataContentType::PositionDeletes); - res.partition(self.partition_value.clone()); - res.build().expect("Guaranteed to be valid") - }) - .collect()) + fn close<'life0, 'async_trait>( + &'life0 mut self, + ) -> Pin>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + Box::pin(async move { + let writer = self.inner_writer.take().unwrap(); + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(DataContentType::PositionDeletes); + res.partition(self.partition_value.clone()); + res.build().expect("Guaranteed to be valid") + }) + .collect()) + }) } } @@ -184,28 +206,38 @@ mod test { // Write some position delete inputs let inputs: Vec = vec![ PositionDeleteInput { - path: "file2.parquet".to_string(), - offsets: vec![2, 1, 3], + path: "file2.parquet", + pos: 2, + }, + PositionDeleteInput { + path: "file2.parquet", + pos: 1, + }, + PositionDeleteInput { + path: "file2.parquet", + pos: 3, + }, + PositionDeleteInput { + path: "file3.parquet", + pos: 2, + }, + PositionDeleteInput { + path: "file1.parquet", + pos: 5, }, PositionDeleteInput { - path: "file3.parquet".to_string(), - offsets: vec![2], + path: "file1.parquet", + pos: 4, }, PositionDeleteInput { - path: "file1.parquet".to_string(), - offsets: vec![5, 4, 1], + path: "file1.parquet", + pos: 1, }, ]; let expect_inputs = inputs .clone() .into_iter() - .flat_map(|input| { - input - .offsets - .iter() - .map(|off| (input.path.clone(), *off)) - .collect::>() - }) + .map(|input| (input.path.to_string(), input.pos)) .collect_vec(); position_delete_writer.write(inputs.clone()).await?; From 3174b425823357c5bca2cc16b664ee4d09c38802 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 25 Feb 2025 14:49:08 +0800 Subject: [PATCH 4/5] refine --- .../position_delete_file_writer.rs | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs index e956c9f3b4..89f12895a9 100644 --- a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs @@ -16,6 +16,11 @@ // under the License. //! Position delete file writer. +//! +//! This writer does not keep track of seen deletes and assumes all incoming records are ordered +//! by file and position as required by the spec. If there is no external process to order the +//! records, consider using SortingPositionDeleteWriter(WIP) instead. + use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -26,29 +31,30 @@ use arrow_array::RecordBatch; use once_cell::sync::Lazy; use crate::arrow::schema_to_arrow_schema; -use crate::spec::{DataContentType, DataFile, NestedField, PrimitiveType, Schema, Struct, Type}; +use crate::spec::{ + DataContentType, DataFile, NestedField, NestedFieldRef, PrimitiveType, Schema, Struct, Type, +}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; -const POS_DELETE_FIELD1_NAME: &str = "file_path"; -const POS_DELETE_FIELD1_ID: i32 = 2147483546; -const POS_DELETE_FIELD2_NAME: &str = "pos"; -const POS_DELETE_FIELD2_ID: i32 = 2147483545; +static DELETE_FILE_PATH: Lazy = Lazy::new(|| { + Arc::new(NestedField::required( + 2147483546, + "file_path", + Type::Primitive(PrimitiveType::String), + )) +}); +static DELETE_FILE_POS: Lazy = Lazy::new(|| { + Arc::new(NestedField::required( + 2147483545, + "pos", + Type::Primitive(PrimitiveType::Long), + )) +}); static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { Schema::builder() - .with_fields(vec![ - Arc::new(NestedField::required( - POS_DELETE_FIELD1_ID, - POS_DELETE_FIELD1_NAME, - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::required( - POS_DELETE_FIELD2_ID, - POS_DELETE_FIELD2_NAME, - Type::Primitive(PrimitiveType::Long), - )), - ]) + .with_fields(vec![DELETE_FILE_PATH.clone(), DELETE_FILE_POS.clone()]) .build() .unwrap() }); From 09a56e8b2af20d73b0d0d76e80c8a71769dd40db Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 27 Feb 2025 12:09:10 +0800 Subject: [PATCH 5/5] refine lifetime --- .../position_delete_file_writer.rs | 86 +++++++------------ 1 file changed, 33 insertions(+), 53 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs index 89f12895a9..fedec5acc6 100644 --- a/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs @@ -21,8 +21,6 @@ //! by file and position as required by the spec. If there is no external process to order the //! records, consider using SortingPositionDeleteWriter(WIP) instead. -use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use arrow_array::builder::{PrimitiveBuilder, StringBuilder}; @@ -61,16 +59,16 @@ static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { /// Position delete input. #[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)] -pub struct PositionDeleteInput<'a> { +pub struct PositionDeleteInput { /// The path of the file. - pub path: &'a str, + pub path: Arc, /// The row number in data file pub pos: i64, } -impl<'a> PositionDeleteInput<'a> { +impl PositionDeleteInput { /// Create a new `PositionDeleteInput`. - pub fn new(path: &'a str, row: i64) -> Self { + pub fn new(path: Arc, row: i64) -> Self { Self { path, pos: row } } } @@ -92,7 +90,7 @@ impl PositionDeleteWriterBuilder { } #[async_trait::async_trait] -impl<'a, B: FileWriterBuilder> IcebergWriterBuilder>> +impl IcebergWriterBuilder> for PositionDeleteWriterBuilder { type R = PositionDeleteWriter; @@ -111,17 +109,9 @@ pub struct PositionDeleteWriter { partition_value: Struct, } -impl<'a, B: FileWriterBuilder> IcebergWriter>> - for PositionDeleteWriter -{ - fn write<'life0, 'async_trait>( - &'life0 mut self, - input: Vec>, - ) -> Pin> + Send + 'async_trait>> - where - 'life0: 'async_trait, - Self: 'async_trait, - { +#[async_trait::async_trait] +impl IcebergWriter> for PositionDeleteWriter { + async fn write(&mut self, input: Vec) -> Result<()> { let mut path_column_builder = StringBuilder::new(); let mut offset_column_builder = PrimitiveBuilder::::new(); for pd_input in input.into_iter() { @@ -137,36 +127,26 @@ impl<'a, B: FileWriterBuilder> IcebergWriter>> ) .map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string())); - Box::pin(async move { - if let Some(inner_writer) = &mut self.inner_writer { - inner_writer.write(&record_batch?).await?; - } else { - return Err(Error::new(ErrorKind::Unexpected, "write has been closed")); - } - Ok(()) - }) + if let Some(inner_writer) = &mut self.inner_writer { + inner_writer.write(&record_batch?).await?; + } else { + return Err(Error::new(ErrorKind::Unexpected, "write has been closed")); + } + Ok(()) } - fn close<'life0, 'async_trait>( - &'life0 mut self, - ) -> Pin>> + Send + 'async_trait>> - where - 'life0: 'async_trait, - Self: 'async_trait, - { - Box::pin(async move { - let writer = self.inner_writer.take().unwrap(); - Ok(writer - .close() - .await? - .into_iter() - .map(|mut res| { - res.content(DataContentType::PositionDeletes); - res.partition(self.partition_value.clone()); - res.build().expect("Guaranteed to be valid") - }) - .collect()) - }) + async fn close(&mut self) -> Result> { + let writer = self.inner_writer.take().unwrap(); + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(DataContentType::PositionDeletes); + res.partition(self.partition_value.clone()); + res.build().expect("Guaranteed to be valid") + }) + .collect()) } } @@ -212,31 +192,31 @@ mod test { // Write some position delete inputs let inputs: Vec = vec![ PositionDeleteInput { - path: "file2.parquet", + path: "file2.parquet".into(), pos: 2, }, PositionDeleteInput { - path: "file2.parquet", + path: "file2.parquet".into(), pos: 1, }, PositionDeleteInput { - path: "file2.parquet", + path: "file2.parquet".into(), pos: 3, }, PositionDeleteInput { - path: "file3.parquet", + path: "file3.parquet".into(), pos: 2, }, PositionDeleteInput { - path: "file1.parquet", + path: "file1.parquet".into(), pos: 5, }, PositionDeleteInput { - path: "file1.parquet", + path: "file1.parquet".into(), pos: 4, }, PositionDeleteInput { - path: "file1.parquet", + path: "file1.parquet".into(), pos: 1, }, ];