From cf48d55538bc25d94d5792fb572a38fc371685bc Mon Sep 17 00:00:00 2001 From: sfali Date: Wed, 28 Aug 2024 19:11:36 -0400 Subject: [PATCH] updated implementation to get headers from dsl #3253 1. DSL to create headers based on function and pass it to implementation 2. Remove "bobType" from "putBlob", separate functions will be introduced for page and append blocks --- .../storage/impl/AzureStorageStream.scala | 114 ++++++------------ .../azure/storage/javadsl/BlobService.scala | 57 ++++++--- .../azure/storage/javadsl/FileService.scala | 85 ++++++++++--- .../azure/storage/scaladsl/BlobService.scala | 31 +++-- .../azure/storage/scaladsl/FileService.scala | 54 +++++++-- .../test/java/docs/javadsl/StorageTest.java | 2 +- 6 files changed, 216 insertions(+), 127 deletions(-) diff --git a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/impl/AzureStorageStream.scala b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/impl/AzureStorageStream.scala index 8656934980..900f433b7f 100644 --- a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/impl/AzureStorageStream.scala +++ b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/impl/AzureStorageStream.scala @@ -12,17 +12,9 @@ import akka.actor.ActorSystem import akka.dispatch.ExecutionContexts import akka.http.scaladsl.Http import akka.http.scaladsl.model.StatusCodes.{Accepted, Created, NotFound, OK} -import akka.http.scaladsl.model.headers.{ - ByteRange, - CustomHeader, - RawHeader, - `Content-Length`, - `Content-Type`, - Range => RangeHeader -} +import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`, CustomHeader} import akka.http.scaladsl.model.{ ContentType, - HttpEntity, HttpHeader, HttpMethod, HttpMethods, @@ -30,6 +22,7 @@ import akka.http.scaladsl.model.{ HttpResponse, ResponseEntity, StatusCode, + UniversalEntity, Uri } import akka.http.scaladsl.unmarshalling.Unmarshal @@ -46,9 +39,8 @@ object AzureStorageStream { private[storage] def getObject(storageType: String, objectPath: String, - range: Option[ByteRange], versionId: Option[String], - leaseId: Option[String]): Source[ByteString, Future[ObjectMetadata]] = { + headers: Seq[HttpHeader]): Source[ByteString, Future[ObjectMetadata]] = { Source .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system @@ -60,7 +52,7 @@ object AzureStorageStream { storageType = storageType, objectPath = objectPath, queryString = createQueryString(settings, versionId.map(value => s"versionId=$value"))), - headers = populateCommonHeaders(HttpEntity.Empty, range = range, leaseId = leaseId) + headers = headers ) val objectMetadataMat = Promise[ObjectMetadata]() signAndRequest(request, settings)(mat.system) @@ -84,7 +76,7 @@ object AzureStorageStream { private[storage] def getObjectProperties(storageType: String, objectPath: String, versionId: Option[String], - leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { + headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = { Source .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system @@ -97,7 +89,7 @@ object AzureStorageStream { storageType = storageType, objectPath = objectPath, queryString = createQueryString(settings, versionId.map(value => s"versionId=$value"))), - headers = populateCommonHeaders(HttpEntity.Empty, leaseId = leaseId) + headers = headers ) signAndRequest(request, settings) @@ -117,7 +109,7 @@ object AzureStorageStream { private[storage] def deleteObject(storageType: String, objectPath: String, versionId: Option[String], - leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { + headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = { Source .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system @@ -130,7 +122,7 @@ object AzureStorageStream { storageType = storageType, objectPath = objectPath, queryString = createQueryString(settings, versionId.map(value => s"versionId=$value"))), - headers = populateCommonHeaders(HttpEntity.Empty, leaseId = leaseId) + headers = headers ) signAndRequest(request, settings) @@ -147,17 +139,13 @@ object AzureStorageStream { .mapMaterializedValue(_ => NotUsed) } - private[storage] def putBlob(blobType: String, - objectPath: String, - contentType: ContentType, - contentLength: Long, - payload: Source[ByteString, _], - leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { + private[storage] def putBlob(objectPath: String, + httpEntity: UniversalEntity, + headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = { Source .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system val settings = resolveSettings(attr, system) - val httpEntity = HttpEntity(contentType, contentLength, payload) val request = createRequest( method = HttpMethods.PUT, @@ -165,7 +153,7 @@ object AzureStorageStream { storageType = BlobType, objectPath = objectPath, queryString = createQueryString(settings)), - headers = populateCommonHeaders(httpEntity, blobType = Some(blobType), leaseId = leaseId) + headers = headers ).withEntity(httpEntity) handlePutRequest(request, settings) } @@ -173,9 +161,7 @@ object AzureStorageStream { } private[storage] def createFile(objectPath: String, - contentType: ContentType, - maxSize: Long, - leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { + headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = { Source .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system @@ -187,32 +173,20 @@ object AzureStorageStream { storageType = FileType, objectPath = objectPath, queryString = createQueryString(settings)), - headers = Seq( - CustomContentTypeHeader(contentType), - RawHeader(XMsContentLengthHeaderKey, maxSize.toString), - RawHeader(FileTypeHeaderKey, "file") - ) ++ leaseId.map(value => RawHeader(LeaseIdHeaderKey, value)) + headers = headers ) handlePutRequest(request, settings) } .mapMaterializedValue(_ => NotUsed) } - private[storage] def updateOrClearRange(objectPath: String, - contentType: ContentType, - range: ByteRange.Slice, - payload: Option[Source[ByteString, _]], - leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { + private[storage] def updateRange(objectPath: String, + httpEntity: UniversalEntity, + headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = { Source .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system val settings = resolveSettings(attr, system) - val contentLength = range.last - range.first + 1 - val clearRange = payload.isEmpty - val writeType = if (clearRange) "clear" else "update" - val overrideContentLength = if (clearRange) Some(0L) else None - val httpEntity = - if (clearRange) HttpEntity.empty(contentType) else HttpEntity(contentType, contentLength, payload.get) val request = createRequest( method = HttpMethods.PUT, @@ -220,17 +194,33 @@ object AzureStorageStream { storageType = FileType, objectPath = objectPath, queryString = createQueryString(settings, Some("comp=range"))), - headers = populateCommonHeaders(httpEntity, - overrideContentLength, - range = Some(range), - leaseId = leaseId, - writeType = Some(writeType)) + headers = headers ).withEntity(httpEntity) handlePutRequest(request, settings) } .mapMaterializedValue(_ => NotUsed) } + private[storage] def clearRange(objectPath: String, + headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = { + Source + .fromMaterializer { (mat, attr) => + implicit val system: ActorSystem = mat.system + val settings = resolveSettings(attr, system) + val request = + createRequest( + method = HttpMethods.PUT, + uri = createUri(settings = settings, + storageType = FileType, + objectPath = objectPath, + queryString = createQueryString(settings, Some("comp=range"))), + headers = headers + ) + handlePutRequest(request, settings) + } + .mapMaterializedValue(_ => NotUsed) + } + private[storage] def createContainer(objectPath: String): Source[Option[ObjectMetadata], NotUsed] = Source .fromMaterializer { (mat, attr) => @@ -243,7 +233,7 @@ object AzureStorageStream { storageType = BlobType, objectPath = objectPath, queryString = createQueryString(settings, Some("restype=container"))), - headers = populateCommonHeaders(HttpEntity.Empty) + headers = StorageHeaders().headers ) handlePutRequest(request, settings) @@ -345,32 +335,6 @@ object AzureStorageStream { } } - private def populateCommonHeaders(entity: HttpEntity, - overrideContentLength: Option[Long] = None, - range: Option[ByteRange] = None, - blobType: Option[String] = None, - leaseId: Option[String] = None, - writeType: Option[String] = None) = { - // Azure required to have these two headers (Content-Length & Content-Type) in the request - // in some cases Content-Length header must be set as 0 - val contentLength = overrideContentLength.orElse(entity.contentLengthOption).getOrElse(0L) - val maybeContentLengthHeader = - if (overrideContentLength.isEmpty && contentLength == 0L) None else Some(CustomContentLengthHeader(contentLength)) - - val maybeContentTypeHeader = - emptyStringToOption(entity.contentType.toString()) match { - case Some(value) if value != "none/none" => Some(CustomContentTypeHeader(entity.contentType)) - case _ => None - } - - val maybeRangeHeader = range.map(RangeHeader(_)) - val maybeBlobTypeHeader = blobType.map(value => RawHeader(BlobTypeHeaderKey, value)) - val maybeLeaseIdHeader = leaseId.map(value => RawHeader(LeaseIdHeaderKey, value)) - val maybeWriteTypeHeader = writeType.map(value => RawHeader(FileWriteTypeHeaderKey, value)) - - (maybeContentLengthHeader ++ maybeContentTypeHeader ++ maybeRangeHeader ++ maybeBlobTypeHeader ++ maybeLeaseIdHeader ++ maybeWriteTypeHeader).toSeq - } - private def createRequest(method: HttpMethod, uri: Uri, headers: Seq[HttpHeader]) = HttpRequest(method = method, uri = uri, headers = headers) diff --git a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/javadsl/BlobService.scala b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/javadsl/BlobService.scala index 720f7a7d5f..0a195aa4bc 100644 --- a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/javadsl/BlobService.scala +++ b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/javadsl/BlobService.scala @@ -11,7 +11,8 @@ import akka.NotUsed import akka.http.javadsl.model._ import akka.http.javadsl.model.headers.ByteRange import akka.http.scaladsl.model.headers.{ByteRange => ScalaByteRange} -import akka.http.scaladsl.model.{ContentType => ScalaContentType} +import akka.http.scaladsl.model.{HttpEntity, ContentType => ScalaContentType} +import akka.stream.alpakka.azure.storage.headers.BlobTypeHeader import akka.stream.alpakka.azure.storage.impl.AzureStorageStream import akka.stream.javadsl.Source import akka.stream.scaladsl.SourceToCompletionStage @@ -38,14 +39,21 @@ object BlobService { def getBlob(objectPath: String, range: ByteRange, versionId: Optional[String], - leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] = { - val scalaRange = range.asInstanceOf[ScalaByteRange] + leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] = new Source( AzureStorageStream - .getObject(BlobType, objectPath, Some(scalaRange), Option(versionId.orElse(null)), Option(leaseId.orElse(null))) + .getObject( + BlobType, + objectPath, + Option(versionId.orElse(null)), + StorageHeaders + .create() + .withRangeHeader(range.asInstanceOf[ScalaByteRange]) + .withLeaseIdHeader(Option(leaseId.orElse(null))) + .headers + ) .toCompletionStage() ) - } /** * Gets blob representing `objectPath` with specified range (if applicable). @@ -61,7 +69,15 @@ object BlobService { leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] = new Source( AzureStorageStream - .getObject(BlobType, objectPath, None, Option(versionId.orElse(null)), Option(leaseId.orElse(null))) + .getObject( + BlobType, + objectPath, + Option(versionId.orElse(null)), + StorageHeaders + .create() + .withLeaseIdHeader(Option(leaseId.orElse(null))) + .headers + ) .toCompletionStage() ) @@ -78,7 +94,10 @@ object BlobService { versionId: Optional[String], leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = AzureStorageStream - .getObjectProperties(BlobType, objectPath, Option(versionId.orElse(null)), Option(leaseId.orElse(null))) + .getObjectProperties(BlobType, + objectPath, + Option(versionId.orElse(null)), + StorageHeaders.create().withLeaseIdHeader(Option(leaseId.orElse(null))).headers) .map(opt => Optional.ofNullable(opt.orNull)) .asJava @@ -95,7 +114,10 @@ object BlobService { versionId: Optional[String], leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = AzureStorageStream - .deleteObject(BlobType, objectPath, Option(versionId.orElse(null)), Option(leaseId.orElse(null))) + .deleteObject(BlobType, + objectPath, + Option(versionId.orElse(null)), + StorageHeaders.create().withLeaseIdHeader(Option(leaseId.orElse(null))).headers) .map(opt => Optional.ofNullable(opt.orNull)) .asJava @@ -106,7 +128,6 @@ object BlobService { * @param contentType content type of the blob * @param contentLength length of the blob * @param payload actual payload, a [[akka.stream.javadsl.Source Source]] of [[akka.util.ByteString ByteString]] - * @param blobType type of the blob, ''Must be one of:'' __'''BlockBlob, PageBlob, or AppendBlob'''__ * @return A [[akka.stream.javadsl.Source Source]] containing an [[scala.Option]] of * [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist */ @@ -114,15 +135,19 @@ object BlobService { contentType: ContentType, contentLength: Long, payload: Source[ByteString, _], - blobType: String = "BlockBlob", leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = AzureStorageStream - .putBlob(blobType, - objectPath, - contentType.asInstanceOf[ScalaContentType], - contentLength, - payload.asScala, - Option(leaseId.orElse(null))) + .putBlob( + objectPath, + HttpEntity(contentType.asInstanceOf[ScalaContentType], contentLength, payload.asScala), + StorageHeaders + .create() + .withContentLengthHeader(contentLength) + .withContentTypeHeader(contentType.asInstanceOf[ScalaContentType]) + .withLeaseIdHeader(Option(leaseId.orElse(null))) + .withBlobTypeHeader(BlobTypeHeader.BlockBlobHeader) + .headers + ) .map(opt => Optional.ofNullable(opt.orNull)) .asJava diff --git a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/javadsl/FileService.scala b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/javadsl/FileService.scala index fe6c22898c..1300f1a75c 100644 --- a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/javadsl/FileService.scala +++ b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/javadsl/FileService.scala @@ -12,7 +12,8 @@ import akka.http.javadsl.model._ import akka.http.javadsl.model.headers.ByteRange import akka.http.scaladsl.model.headers.ByteRange.Slice import akka.http.scaladsl.model.headers.{ByteRange => ScalaByteRange} -import akka.http.scaladsl.model.{ContentType => ScalaContentType} +import akka.http.scaladsl.model.{HttpEntity, ContentType => ScalaContentType} +import akka.stream.alpakka.azure.storage.headers.FileWriteTypeHeader import akka.stream.alpakka.azure.storage.impl.AzureStorageStream import akka.stream.javadsl.Source import akka.stream.scaladsl.SourceToCompletionStage @@ -39,14 +40,21 @@ object FileService { def getFile(objectPath: String, range: ByteRange, versionId: Optional[String], - leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] = { - val scalaRange = range.asInstanceOf[ScalaByteRange] + leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] = new Source( AzureStorageStream - .getObject(FileType, objectPath, Some(scalaRange), Option(versionId.orElse(null)), Option(leaseId.orElse(null))) + .getObject( + FileType, + objectPath, + Option(versionId.orElse(null)), + StorageHeaders + .create() + .withRangeHeader(range.asInstanceOf[ScalaByteRange]) + .withLeaseIdHeader(Option(leaseId.orElse(null))) + .headers + ) .toCompletionStage() ) - } /** * Gets file representing `objectPath` with specified range (if applicable). @@ -62,7 +70,15 @@ object FileService { leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] = { new Source( AzureStorageStream - .getObject(FileType, objectPath, None, Option(versionId.orElse(null)), Option(leaseId.orElse(null))) + .getObject( + FileType, + objectPath, + Option(versionId.orElse(null)), + StorageHeaders + .create() + .withLeaseIdHeader(Option(leaseId.orElse(null))) + .headers + ) .toCompletionStage() ) } @@ -80,7 +96,10 @@ object FileService { versionId: Optional[String], leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = AzureStorageStream - .getObjectProperties(FileType, objectPath, Option(versionId.orElse(null)), Option(leaseId.orElse(null))) + .getObjectProperties(FileType, + objectPath, + Option(versionId.orElse(null)), + StorageHeaders.create().withLeaseIdHeader(Option(leaseId.orElse(null))).headers) .map(opt => Optional.ofNullable(opt.orNull)) .asJava @@ -97,7 +116,10 @@ object FileService { versionId: Optional[String], leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = AzureStorageStream - .deleteObject(FileType, objectPath, Option(versionId.orElse(null)), Option(leaseId.orElse(null))) + .deleteObject(FileType, + objectPath, + Option(versionId.orElse(null)), + StorageHeaders.create().withLeaseIdHeader(Option(leaseId.orElse(null))).headers) .map(opt => Optional.ofNullable(opt.orNull)) .asJava @@ -116,7 +138,16 @@ object FileService { maxSize: Long, leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = AzureStorageStream - .createFile(objectPath, contentType.asInstanceOf[ScalaContentType], maxSize, Option(leaseId.orElse(null))) + .createFile( + objectPath, + StorageHeaders + .create() + .withContentTypeHeader(contentType.asInstanceOf[ScalaContentType]) + .withFileMaxContentLengthHeader(maxSize) + .withFileTypeHeader() + .withLeaseIdHeader(Option(leaseId.orElse(null))) + .headers + ) .map(opt => Optional.ofNullable(opt.orNull)) .asJava @@ -135,15 +166,24 @@ object FileService { contentType: ContentType, range: Slice, payload: Source[ByteString, _], - leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = + leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = { + val contentLength = range.last - range.first + 1 AzureStorageStream - .updateOrClearRange(objectPath, - contentType.asInstanceOf[ScalaContentType], - range, - Some(payload.asScala), - Option(leaseId.orElse(null))) + .updateRange( + objectPath, + HttpEntity(contentType.asInstanceOf[ScalaContentType], contentLength, payload.asScala), + StorageHeaders + .create() + .withContentLengthHeader(contentLength) + .withContentTypeHeader(contentType.asInstanceOf[ScalaContentType]) + .withRangeHeader(range) + .withLeaseIdHeader(Option(leaseId.orElse(null))) + .withFileWriteTypeHeader(FileWriteTypeHeader.UpdateFileHeader) + .headers + ) .map(opt => Optional.ofNullable(opt.orNull)) .asJava + } /** * Clears specified range from the file. @@ -158,11 +198,16 @@ object FileService { range: Slice, leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] = AzureStorageStream - .updateOrClearRange(objectPath, - ContentTypes.NO_CONTENT_TYPE.asInstanceOf[ScalaContentType], - range, - None, - Option(leaseId.orElse(null))) + .clearRange( + objectPath, + StorageHeaders + .create() + .withContentLengthHeader(0L) + .withRangeHeader(range) + .withLeaseIdHeader(Option(leaseId.orElse(null))) + .withFileWriteTypeHeader(FileWriteTypeHeader.ClearFileHeader) + .headers + ) .map(opt => Optional.ofNullable(opt.orNull)) .asJava } diff --git a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/scaladsl/BlobService.scala b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/scaladsl/BlobService.scala index 9a0c77fe79..07bb35f680 100644 --- a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/scaladsl/BlobService.scala +++ b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/scaladsl/BlobService.scala @@ -8,8 +8,9 @@ package storage package scaladsl import akka.NotUsed -import akka.http.scaladsl.model.{ContentType, ContentTypes} +import akka.http.scaladsl.model.{ContentType, ContentTypes, HttpEntity} import akka.http.scaladsl.model.headers.ByteRange +import akka.stream.alpakka.azure.storage.headers.BlobTypeHeader import akka.stream.alpakka.azure.storage.impl.AzureStorageStream import akka.stream.scaladsl.Source import akka.util.ByteString @@ -35,7 +36,10 @@ object BlobService { range: Option[ByteRange] = None, versionId: Option[String] = None, leaseId: Option[String] = None): Source[ByteString, Future[ObjectMetadata]] = - AzureStorageStream.getObject(BlobType, objectPath, range, versionId, leaseId) + AzureStorageStream.getObject(BlobType, + objectPath, + versionId, + StorageHeaders().withRangeHeader(range).withLeaseIdHeader(leaseId).headers) /** * Gets blob properties. @@ -49,7 +53,10 @@ object BlobService { def getProperties(objectPath: String, versionId: Option[String] = None, leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = - AzureStorageStream.getObjectProperties(BlobType, objectPath, versionId, leaseId) + AzureStorageStream.getObjectProperties(BlobType, + objectPath, + versionId, + StorageHeaders().withLeaseIdHeader(leaseId).headers) /** * Deletes blob. @@ -63,7 +70,10 @@ object BlobService { def deleteBlob(objectPath: String, versionId: Option[String] = None, leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = - AzureStorageStream.deleteObject(BlobType, objectPath, versionId, leaseId) + AzureStorageStream.deleteObject(BlobType, + objectPath, + versionId, + StorageHeaders().withLeaseIdHeader(leaseId).headers) /** * Put blob. @@ -72,7 +82,6 @@ object BlobService { * @param contentType content type of the blob * @param contentLength length of the blob * @param payload actual payload, a [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]] - * @param blobType type of the blob, ''Must be one of:'' __'''BlockBlob, PageBlob, or AppendBlob'''__ * @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of * [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist */ @@ -80,9 +89,17 @@ object BlobService { contentType: ContentType = ContentTypes.`application/octet-stream`, contentLength: Long, payload: Source[ByteString, _], - blobType: String = "BlockBlob", leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = - AzureStorageStream.putBlob(blobType, objectPath, contentType, contentLength, payload, leaseId) + AzureStorageStream.putBlob( + objectPath, + HttpEntity(contentType, contentLength, payload), + StorageHeaders() + .withContentLengthHeader(contentLength) + .withContentTypeHeader(contentType) + .withBlobTypeHeader(BlobTypeHeader.BlockBlobHeader) + .withLeaseIdHeader(leaseId) + .headers + ) /** * Create container. diff --git a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/scaladsl/FileService.scala b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/scaladsl/FileService.scala index 673dc3edd5..a6d0a4fb6c 100644 --- a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/scaladsl/FileService.scala +++ b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/scaladsl/FileService.scala @@ -8,8 +8,9 @@ package storage package scaladsl import akka.NotUsed -import akka.http.scaladsl.model.{ContentType, ContentTypes} +import akka.http.scaladsl.model.{ContentType, ContentTypes, HttpEntity} import akka.http.scaladsl.model.headers.ByteRange +import akka.stream.alpakka.azure.storage.headers.FileWriteTypeHeader import akka.stream.alpakka.azure.storage.impl.AzureStorageStream import akka.stream.scaladsl.Source import akka.util.ByteString @@ -35,7 +36,10 @@ object FileService { range: Option[ByteRange] = None, versionId: Option[String] = None, leaseId: Option[String] = None): Source[ByteString, Future[ObjectMetadata]] = - AzureStorageStream.getObject(FileType, objectPath, range, versionId, leaseId) + AzureStorageStream.getObject(FileType, + objectPath, + versionId, + StorageHeaders().withRangeHeader(range).withLeaseIdHeader(leaseId).headers) /** * Gets file properties. @@ -49,7 +53,10 @@ object FileService { def getProperties(objectPath: String, versionId: Option[String] = None, leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = - AzureStorageStream.getObjectProperties(FileType, objectPath, versionId, leaseId) + AzureStorageStream.getObjectProperties(FileType, + objectPath, + versionId, + StorageHeaders().withLeaseIdHeader(leaseId).headers) /** * Deletes file. @@ -63,7 +70,10 @@ object FileService { def deleteFile(objectPath: String, versionId: Option[String] = None, leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = - AzureStorageStream.deleteObject(FileType, objectPath, versionId, leaseId) + AzureStorageStream.deleteObject(FileType, + objectPath, + versionId, + StorageHeaders().withLeaseIdHeader(leaseId).headers) /** * Creates a file. @@ -79,7 +89,15 @@ object FileService { contentType: ContentType = ContentTypes.`application/octet-stream`, maxSize: Long, leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = - AzureStorageStream.createFile(objectPath, contentType, maxSize, leaseId) + AzureStorageStream.createFile( + objectPath, + StorageHeaders() + .withContentTypeHeader(contentType) + .withFileMaxContentLengthHeader(maxSize) + .withFileTypeHeader() + .withLeaseIdHeader(leaseId) + .headers + ) /** * Updates file on the specified range. @@ -96,8 +114,20 @@ object FileService { contentType: ContentType = ContentTypes.`application/octet-stream`, range: ByteRange.Slice, payload: Source[ByteString, _], - leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = - AzureStorageStream.updateOrClearRange(objectPath, contentType, range, Some(payload), leaseId) + leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = { + val contentLength = range.last - range.first + 1 + AzureStorageStream.updateRange( + objectPath, + HttpEntity(contentType, contentLength, payload), + StorageHeaders() + .withContentLengthHeader(contentLength) + .withContentTypeHeader(contentType) + .withRangeHeader(range) + .withLeaseIdHeader(leaseId) + .withFileWriteTypeHeader(FileWriteTypeHeader.UpdateFileHeader) + .headers + ) + } /** * Clears specified range from the file. @@ -111,5 +141,13 @@ object FileService { def clearRange(objectPath: String, range: ByteRange.Slice, leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] = - AzureStorageStream.updateOrClearRange(objectPath, ContentTypes.NoContentType, range, None, leaseId) + AzureStorageStream.clearRange( + objectPath, + StorageHeaders() + .withContentLengthHeader(0L) + .withRangeHeader(range) + .withLeaseIdHeader(leaseId) + .withFileWriteTypeHeader(FileWriteTypeHeader.ClearFileHeader) + .headers + ) } diff --git a/azure-storage/src/test/java/docs/javadsl/StorageTest.java b/azure-storage/src/test/java/docs/javadsl/StorageTest.java index 50e056a17f..9092d369ff 100644 --- a/azure-storage/src/test/java/docs/javadsl/StorageTest.java +++ b/azure-storage/src/test/java/docs/javadsl/StorageTest.java @@ -85,7 +85,7 @@ public void putBlob() throws Exception { ContentTypes.TEXT_PLAIN_UTF8, contentLength(), Source.single(ByteString.fromString(payload())), - "BlockBlob", Optional.empty()); + Optional.empty()); final CompletionStage> optionalCompletionStage = source.runWith(Sink.head(), system); //#put-blob