Skip to content

Commit

Permalink
ongoing changes to handle pdf files
Browse files Browse the repository at this point in the history
  • Loading branch information
pcdeadeasy committed Feb 5, 2025
1 parent af5f1dd commit abb7367
Show file tree
Hide file tree
Showing 9 changed files with 2,056 additions and 0 deletions.
146 changes: 146 additions & 0 deletions ts/examples/docuProc/src/pdfChunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import subprocess
import sys

# Ensure required dependencies are installed
required_packages = ["pdfplumber", "pymupdf"]
for package in required_packages:
try:
__import__(package)
except ImportError:
print(f"Installing missing package: {package}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", package])

import pdfplumber # type: ignore
import fitz # type: ignore
import json
import os
import datetime
import csv
from dataclasses import dataclass
from typing import List, Dict, Any, Optional

IdType = str

@dataclass
class Blob:
"""A sequence of text, table, or image data plus metadata."""
type: str # "text", "table", "image"
content: Any # Text (list of lines), table (list of lists), image path (str)
start: int # Page number (0-based)
bbox: Optional[List[float]] = None # Bounding box if applicable

def to_dict(self) -> Dict[str, Any]:
result = {
"type": self.type,
"content": self.content,
"start": self.start,
}
if self.bbox:
result["bbox"] = self.bbox
return result

@dataclass
class Chunk:
"""A chunk at any level of nesting (root, inner, leaf)."""
id: IdType
pageid: IdType
blobs: List[Blob] # Blobs around the placeholders
parentId: IdType
children: List[IdType] # len() is one less than len(blobs)

def to_dict(self) -> Dict[str, object]:
return {
"id": self.id,
"pageid": self.pageid,
"blobs": [blob.to_dict() for blob in self.blobs],
"parentId": self.parentId,
"children": self.children,
}

@dataclass
class ChunkedFile:
"""A file with extracted chunks."""
file_name: str
chunks: List[Chunk]

def to_dict(self) -> Dict[str, Any]:
return {
"file_name": self.file_name,
"chunks": [chunk.to_dict() for chunk in self.chunks],
}

class PDFChunker:
def __init__(self, file_path: str, output_dir: str = "output"):
self.file_path = file_path
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)

def generate_id(self) -> str:
return datetime.datetime.now().strftime("%Y%m%d-%H%M%S.%f")

def extract_text_chunks(self, pdf, by_paragraph: bool = True) -> List[Chunk]:
chunks = []
for page_num, page in enumerate(pdf.pages):
text = page.extract_text()
if text:
lines = text.split("\n")
chunk_id = self.generate_id()
if by_paragraph:
paragraphs = text.split("\n\n")
blobs = [Blob("text", para.split("\n"), page_num) for para in paragraphs]
else:
blobs = [Blob("text", lines, page_num)]
chunks.append(Chunk(chunk_id, str(page_num), blobs, "", []))
return chunks

def extract_tables(self, pdf) -> List[Chunk]:
chunks = []
for page_num, page in enumerate(pdf.pages):
tables = page.extract_tables()
for table in tables:
table_path = os.path.join(self.output_dir, f"table_{page_num}_{self.generate_id()}.csv")
with open(table_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerows(table)
chunk_id = self.generate_id()
blobs = [Blob("table", table_path, page_num)]
chunks.append(Chunk(chunk_id, str(page_num), blobs, "", []))
return chunks

def extract_images(self) -> List[Chunk]:
chunks = []
doc = fitz.open(self.file_path)
for page_num in range(len(doc)):
for img_index, img in enumerate(doc[page_num].get_images(full=True)):
xref = img[0]
pix = fitz.Pixmap(doc, xref)
img_path = os.path.join(self.output_dir, f"image_{page_num}_{img_index}.png")
pix.save(img_path)
bbox = list(doc[page_num].get_image_bbox(xref))
chunk_id = self.generate_id()
blobs = [Blob("image", img_path, page_num, bbox)]
chunks.append(Chunk(chunk_id, str(page_num), blobs, "", []))
return chunks

def chunkify(self, by_paragraph: bool = True) -> ChunkedFile:
with pdfplumber.open(self.file_path) as pdf:
text_chunks = self.extract_text_chunks(pdf, by_paragraph)
table_chunks = self.extract_tables(pdf)
image_chunks = self.extract_images()
all_chunks = text_chunks + table_chunks + image_chunks
return ChunkedFile(self.file_path, all_chunks)

def save_json(self, output_path: str, by_paragraph: bool = True):
chunked_file = self.chunkify(by_paragraph)
with open(output_path, "w") as f:
json.dump(chunked_file.to_dict(), f, indent=2)

# Example usage
if __name__ == "__main__":
pdf_path = "sample.pdf"
output_json = "output.json"
chunker = PDFChunker(pdf_path)
chunker.save_json(output_json)
186 changes: 186 additions & 0 deletions ts/examples/docuProc/src/pdfChunker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

// This requires that python3 is on the PATH
// and the pdfChunker.py script is in the dist directory.

import { exec } from "child_process";
import path from "path";
import { fileURLToPath } from "url";
import { promisify } from "util";

import { PdfFileDocumentation } from "./pdfDocChunkSchema.js";

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

const execPromise = promisify(exec);

export type ChunkId = string;

export interface Blob {
start: number; // int; 0-based!
lines: string[];
breadcrumb?: boolean;
}

export interface Chunk {
// Names here must match names in pdfChunker.py.
id: ChunkId;
treeName: string;
blobs: Blob[];
parentId: ChunkId;
children: ChunkId[];
fileName: string; // Set upon receiving end from ChunkedFile.fileName.
docs?: PdfFileDocumentation; // Computed later by fileDocumenter.
}

export interface ChunkedFile {
fileName: string;
chunks: Chunk[];
}

export interface ErrorItem {
error: string;
filename?: string;
output?: string;
}

export async function chunkifyPdfFiles(
filenames: string[],
): Promise<(ChunkedFile | ErrorItem)[]> {
let output,
errors,
success = false;
try {
const chunkerPath = path.join(__dirname, "pdfChunker.py");
let { stdout, stderr } = await execPromise(
`python3 -X utf8 ${chunkerPath} ${filenames.join(" ")}`,
{ maxBuffer: 64 * 1024 * 1024 }, // Super large buffer
);
output = stdout;
errors = stderr;
success = true;
} catch (error: any) {
output = error?.stdout || "";
errors = error?.stderr || error.message || "Unknown error";
}

if (!success) {
return [{ error: errors, output: output }];
}
if (errors) {
return [{ error: errors, output: output }];
}
if (!output) {
return [{ error: "No output from chunker script" }];
}

const results: (ChunkedFile | ErrorItem)[] = JSON.parse(output);
// TODO: validate that JSON matches our schema.

// Ensure all chunks have a filename.
for (const result of results) {
if (!("error" in result)) {
for (const chunk of result.chunks) {
chunk.fileName = result.fileName;
}
}
}
return splitLargeFiles(results);
}

const CHUNK_COUNT_LIMIT = 25; // How many chunks at most.
const FILE_SIZE_LIMIT = 25000; // How many characters at most.

function splitLargeFiles(
items: (ChunkedFile | ErrorItem)[],
): (ChunkedFile | ErrorItem)[] {
const results: (ChunkedFile | ErrorItem)[] = [];
for (const item of items) {
if (
"error" in item ||
(item.chunks.length <= CHUNK_COUNT_LIMIT &&
fileSize(item) <= FILE_SIZE_LIMIT)
) {
results.push(item);
} else {
results.push(...splitFile(item));
}
}
return results;
}

// This algorithm is too complex. I needed a debugger and logging to get it right.
function splitFile(file: ChunkedFile): ChunkedFile[] {
const fileName = file.fileName;
const parentMap: Map<ChunkId, Chunk> = new Map();
for (const chunk of file.chunks) {
// Only nodes with children will be looked up in this map.
if (chunk.children.length) parentMap.set(chunk.id, chunk);
}

const results: ChunkedFile[] = []; // Where output accumulates.
let chunks = Array.from(file.chunks); // The chunks yet to emit.
let minNumChunks = 1;

outer: while (true) {
// Keep going until we exit the inner loop.
let totalSize = 0; // Size in characters of chunks to be output.
for (let i = 0; i < chunks.length; i++) {
// Iterate in pre-order.
const currentChunk = chunks[i];
const size = chunkSize(currentChunk);
if (
i < minNumChunks ||
(i < CHUNK_COUNT_LIMIT && totalSize + size <= FILE_SIZE_LIMIT)
) {
totalSize += size;
continue;
}

// Split the file here (current chunk goes into ancestors).
const rest = chunks.splice(i);
if (rest.shift() !== currentChunk)
throw Error(
"Internal error: expected current chunk at head of rest",
);
results.push({ fileName, chunks });
const ancestors: Chunk[] = [];

let c: Chunk | undefined = currentChunk;
do {
ancestors.unshift(c);
c = parentMap.get(c.parentId);
} while (c);
// Note that the current chunk is the last ancestor.
chunks = [...ancestors, ...rest];
minNumChunks = ancestors.length;
continue outer;
}
// Append the final chunk.
results.push({ fileName, chunks });
break;
}
// console.log(
// `Split ${file.fileName} (${file.chunks.length} chunks) into ${results.length} files.`,
// );
// console.log(`Sizes: ${results.map((f) => f.chunks.length).join(", ")}`);
return results;
}

function fileSize(file: ChunkedFile): number {
return file.chunks.reduce((acc, chunk) => acc + chunkSize(chunk), 0);
}

function chunkSize(chunk: Chunk): number {
let totalCharacters = 0;
for (const blob of chunk.blobs) {
if (!blob.breadcrumb) {
for (const line of blob.lines) {
totalCharacters += line.length;
}
}
}
return totalCharacters;
}
Loading

0 comments on commit abb7367

Please sign in to comment.