Skip to content

Commit

Permalink
AWS S3: Add S3.listBuckets (#2899)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich authored Sep 5, 2022
1 parent 75340d9 commit 4ebf24a
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 0 deletions.
17 changes: 17 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ import scala.concurrent.{ExecutionContext, Future}

private final val BucketPattern = "{bucket}"

def listBuckets(headers: Seq[HttpHeader] = Nil)(implicit conf: S3Settings): HttpRequest = {
val awsHost = Authority(Uri.Host(s"s3.amazonaws.com"))
val (authority, scheme) = conf.endpointUrl match {
case Some(endpointUrl) =>
val customUri = Uri(endpointUrl)
(customUri.authority, customUri.scheme)
case None =>
(awsHost, "https")
}

val uri = Uri(authority = authority, scheme = scheme)

HttpRequest(HttpMethods.GET)
.withHeaders(Host(awsHost) +: headers)
.withUri(uri)
}

def listBucket(
bucket: String,
prefix: Option[String] = None,
Expand Down
21 changes: 21 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,27 @@ import scala.xml.NodeSeq
}
}

implicit val listBucketsResultUnmarshaller: FromEntityUnmarshaller[ListBucketsResult] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml` withCharset HttpCharsets.`UTF-8`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
case x =>
val bucketsRoot = (x \\ "Buckets").headOption
.map { br =>
(br \\ "Bucket").map { u =>
val creationDate = Instant.parse((u \ "CreationDate").text)
val name = (u \ "Name").text

ListBucketsResultContents(
creationDate,
name
)
}
}
.getOrElse(Nil)
ListBucketsResult(bucketsRoot)
}
}

implicit val listMultipartUploadsResultUnmarshaller: FromEntityUnmarshaller[ListMultipartUploadsResult] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml` withCharset HttpCharsets.`UTF-8`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
Expand Down
22 changes: 22 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ import scala.util.{Failure, Success, Try}
eTag: String,
versionId: Option[String] = None)

/** Internal Api */
@InternalApi private[impl] final case class ListBucketsResult(buckets: Seq[ListBucketsResultContents])

/** Internal Api */
@InternalApi private[impl] final case class ListBucketResult(isTruncated: Boolean,
continuationToken: Option[String],
Expand Down Expand Up @@ -321,6 +324,25 @@ import scala.util.{Failure, Success, Try}
.mapMaterializedValue(_ => NotUsed)
}

def listBuckets(s3Headers: S3Headers): Source[ListBucketsResultContents, NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
implicit val conf: S3Settings = resolveSettings(attr, mat.system)

Source
.future {
signAndGetAs[ListBucketsResult](
HttpRequests.listBuckets(s3Headers.headers)
).map { (res: ListBucketsResult) =>
res.buckets
}(ExecutionContexts.parasitic)
}
.flatMapConcat(results => Source(results))
}
.mapMaterializedValue(_ => NotUsed)

type ListMultipartUploadState = S3PaginationState[ListMultipartUploadContinuationToken]

def listMultipartUploadCall[T](
Expand Down
20 changes: 20 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,26 @@ object S3 {
)
}

/**
* Will return a list containing all of the buckets for the current AWS account
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBuckets.html
* @return [[akka.stream.javadsl.Source Source]] of [[ListBucketsResultContents]]
*/
def listBuckets(): Source[ListBucketsResultContents, NotUsed] =
listBuckets(S3Headers.empty)

/**
* Will return a list containing all of the buckets for the current AWS account
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBuckets.html
* @return [[akka.stream.javadsl.Source Source]] of [[ListBucketsResultContents]]
*/
def listBuckets(s3Headers: S3Headers): Source[ListBucketsResultContents, NotUsed] =
S3Stream
.listBuckets(s3Headers)
.asJava

/**
* Will return a source of object metadata for a given bucket with optional prefix using version 2 of the List Bucket API.
* This will automatically page through all keys with the given parameters.
Expand Down
58 changes: 58 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,64 @@ object FailedUpload {
def create(reasons: Seq[Throwable]): FailedUpload = FailedUpload(reasons)
}

final class ListBucketsResultContents private (val creationDate: java.time.Instant, val name: String) {

/** Java API */
def getCreationDate: java.time.Instant = creationDate

/** Java API */
def getName: String = name

def withCreationDate(value: java.time.Instant): ListBucketsResultContents = copy(creationDate = value)

def withName(value: String): ListBucketsResultContents = copy(name = value)

private def copy(
name: String = name,
creationDate: java.time.Instant = creationDate
): ListBucketsResultContents = new ListBucketsResultContents(
name = name,
creationDate = creationDate
)

override def toString: String =
"ListBucketsResultContents(" +
s"creationDate=$creationDate," +
s"name=$name" +
")"

override def equals(other: Any): Boolean = other match {
case that: ListBucketsResultContents =>
Objects.equals(this.name, that.name) &&
Objects.equals(this.creationDate, that.creationDate)
case _ => false
}

override def hashCode(): Int =
Objects.hash(name, creationDate)
}

object ListBucketsResultContents {

/** Scala API */
def apply(
creationDate: java.time.Instant,
name: String
): ListBucketsResultContents = new ListBucketsResultContents(
creationDate,
name
)

/** Java API */
def create(
creationDate: java.time.Instant,
name: String
): ListBucketsResultContents = apply(
creationDate,
name
)
}

/**
* @param bucketName The name of the bucket in which this object is stored
* @param key The key under which this object is stored
Expand Down
19 changes: 19 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,25 @@ object S3 {
): Source[ByteString, Future[ObjectMetadata]] =
S3Stream.getObject(S3Location(bucket, key), range, versionId, s3Headers)

/**
* Will return a list containing all of the buckets for the current AWS account
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBuckets.html
* @return [[akka.stream.scaladsl.Source Source]] of [[ListBucketsResultContents]]
*/
def listBuckets(): Source[ListBucketsResultContents, NotUsed] =
listBuckets(S3Headers.empty)

/**
* Will return a list containing all of the buckets for the current AWS account
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBuckets.html
* @param s3Headers any headers you want to add
* @return [[akka.stream.scaladsl.Source Source]] of [[ListBucketsResultContents]]
*/
def listBuckets(s3Headers: S3Headers): Source[ListBucketsResultContents, NotUsed] =
S3Stream.listBuckets(s3Headers)

/**
* Will return a source of object metadata for a given bucket with optional prefix using version 2 of the List Bucket API.
* This will automatically page through all keys with the given parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ trait S3IntegrationSpec
def defaultRegionContentCount = 4
def otherRegionContentCount = 5

it should "list buckets in current aws account" in {
val result = for {
buckets <- S3.listBuckets().withAttributes(attributes).runWith(Sink.seq)
} yield buckets

val buckets = result.futureValue

buckets.map(_.name) should contain(defaultBucket)
}

it should "list with real credentials" in {
val result = S3
.listBucket(defaultBucket, None)
Expand Down

0 comments on commit 4ebf24a

Please sign in to comment.