Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to merge multiple data sources and deduplicate based on certain fields? #12532

Open
wj-stack opened this issue Sep 19, 2024 · 0 comments
Open
Labels
enhancement New feature or request

Comments

@wj-stack
Copy link

Is your feature request related to a problem or challenge?

I currently have two data sources, one stored in Parquet format and the other in memory. I need to implement a scan function. I tried using UnionExec, but it's obviously not working, especially when using aggregation functions like count. Maybe I should use SortPreservingMergeExec, but there are too few examples of this function. I would appreciate it if you could add an example that includes multiple data sources, as these sources may contain duplicate data, and I would be happy to see an example of deduplication based on multiple fields.

    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // convert filters like [`a = 1`, `b = 2`] to a single filter like `a = 1 AND b = 2`
        let predicate = self.filters_to_predicate(state, filters)?;

        // Now, we invoke the analysis code to perform the range analysis
        let df_schema = DFSchema::try_from(self.schema())?;

        let boundaries = ExprBoundaries::try_new_unbounded(&self.schema())?;

        let analysis_result = analyze(
            &predicate,
            AnalysisContext::new(boundaries),
            df_schema.as_ref(),
        )?;

        // In this example, we use the PruningPredicate's literal guarantees to
        // analyze the predicate. In a real system, using
        // `PruningPredicate::prune` would likely be easier to do.
        let pruning_predicate =
            PruningPredicate::try_new(Arc::clone(&predicate), self.schema().clone())?;

        debug!("pruning_predicate:{:?}", pruning_predicate);

        // The PruningPredicate's guarantees must all be satisfied in order for
        // the predicate to possibly evaluate to true.
        let guarantees = pruning_predicate.literal_guarantees();

        debug!("guarantees:{:?}", guarantees);

        let object_store_url = ObjectStoreUrl::parse("file://")?;
        let mut file_scan_config = FileScanConfig::new(object_store_url, self.schema())
            .with_projection(projection.cloned())
            .with_limit(limit);

        let mut points = vec![];

        for expr in &analysis_result.boundaries {
            if expr.column.name() == "show_time" {
                let lower = expr.interval.lower().clone().to_array().unwrap();
                let lower = lower
                    .as_any()
                    .downcast_ref::<TimestampMillisecondArray>()
                    .unwrap();

                let upper = expr.interval.upper().clone().to_array().unwrap();
                let upper = upper
                    .as_any()
                    .downcast_ref::<TimestampMillisecondArray>()
                    .unwrap();

                debug!(
                    "{:?} lower:{:?} upper:{:?}",
                    expr.column,
                    lower.value(0),
                    upper.value(0)
                );

                let start = DateTime::<Utc>::from_timestamp_millis(lower.value(0)).unwrap();
                let end = DateTime::<Utc>::from_timestamp_millis(upper.value(0)).unwrap();

                let p = self.query_with_time(start, end).await;

                if p.len() != 0 {
                    let addr = { self.fields.read().await.clone() };
                    let additional = { self.additional.read().await.clone() };

                    let batch = create_record_batch(&p, &addr, &additional).unwrap();
                    points.push(batch);
                }

                let dirs = self.get_dir_by_time(start, end).unwrap_or_default();

                info!("dirs: {:?}  {:?} {:?}", start, end, dirs);

                for s in dirs {
                    let mut files = list_files_in_directory(&s).unwrap();
                    files.reverse();

                    let mut v = vec![];

                    for s in files {
                        if s.extension().unwrap() == "parquet" {
                            let f = self
                                .files
                                .optionally_get_with(format!("{:?}", s), async {
                                    let f = add_file(Path::new(&s)).unwrap();
                                    Some((String::from(f.0), f.1))
                                })
                                .await
                                .unwrap();

                            v.push(PartitionedFile::new(f.0.clone(), f.1));
                        }
                    }
                    file_scan_config = file_scan_config.with_file_group(v);
                }

                break;
            }
        }

        let exec: Arc<ParquetExec> = ParquetExec::builder(file_scan_config)
            .with_predicate(predicate)
            .build_arc();

        // count error

        let memory =
            MemoryExec::try_new(&vec![points], self.schema(), projection.cloned()).unwrap();

        let union = Arc::new(UnionExec::new(vec![Arc::new(memory), exec]));

        Ok(union)
    }

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@wj-stack wj-stack added the enhancement New feature or request label Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant