Skip to content

Commit

Permalink
[CLIENT-3310] PutPayload method ignores Policy values
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Feb 21, 2025
1 parent 36b25aa commit 485692f
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 4 deletions.
8 changes: 6 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,14 @@ func (clnt *Client) GetNodeNames() []string {

// PutPayload writes the raw write/delete payload to the server.
// The policy specifies the transaction timeout.
// If the policy is nil, the default relevant policy will be used.
// If the policy is nil, the default relevant policy will be used, but the message
// header data regarding generation, expiration and others will not be touched.
// Values for `policy.FilterExpression` or `policy.Txn` will always be ignored.
// If `policy.Expiration` is set to `TTLDontUpdate`, the value will not be updated on the payload.
func (clnt *Client) PutPayload(policy *WritePolicy, key *Key, payload []byte) Error {
ogPolicy := policy
policy = clnt.getUsableWritePolicy(policy)
command, err := newWritePayloadCommand(clnt.cluster, policy, key, payload)
command, err := newWritePayloadCommand(clnt.cluster, policy, ogPolicy, key, payload)
if err != nil {
return err
}
Expand Down
38 changes: 38 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,44 @@ var _ = gg.Describe("Aerospike", func() {
gm.Expect(exists).To(gm.BeFalse())
})

gg.It("must support policy values", func() {
key, err = as.NewKey(ns, set, 0)
gm.Expect(err).ToNot(gm.HaveOccurred())

client.Delete(nil, key)
gm.Expect(err).ToNot(gm.HaveOccurred())

binMap := as.BinMap{
"Aerospike": "value",
"Aerospike1": "value2",
}

err := client.Put(nil, key, binMap)
gm.Expect(err).ToNot(gm.HaveOccurred())

exists, err := client.Exists(nil, key)
gm.Expect(err).ToNot(gm.HaveOccurred())
gm.Expect(exists).To(gm.BeTrue())

binMap = as.BinMap{
"Aerospike": "value",
"Aerospike1": "value2",
"Aerospike2": "value3",
}
wcmd, err := as.NewWriteCommand(nil, wpolicy, key, nil, binMap)
gm.Expect(err).ToNot(gm.HaveOccurred())

err = wcmd.WriteBuffer(&wcmd)
gm.Expect(err).ToNot(gm.HaveOccurred())
payload := wcmd.Buffer()

wp := as.NewWritePolicy(0, 0)
wp.GenerationPolicy = as.EXPECT_GEN_EQUAL
err = client.PutPayload(wp, key, payload)
gm.Expect(err).To(gm.HaveOccurred())
gm.Expect(err.Matches(ast.GENERATION_ERROR)).To(gm.BeTrue())
})

})

gg.Context("Put operations", func() {
Expand Down
66 changes: 64 additions & 2 deletions write_payload_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ var _ command = &writePayloadCommand{}
type writePayloadCommand struct {
singleCommand

policy *WritePolicy
payload []byte
policy *WritePolicy
ogPolicy *WritePolicy
payload []byte
}

func newWritePayloadCommand(
cluster *Cluster,
policy *WritePolicy,
ogPolicy *WritePolicy,
key *Key,
payload []byte,
) (writePayloadCommand, Error) {
Expand All @@ -49,6 +51,7 @@ func newWritePayloadCommand(
newWriteCmd := writePayloadCommand{
singleCommand: newSingleCommand(cluster, key, partition),
policy: policy,
ogPolicy: ogPolicy,
payload: payload,
}

Expand All @@ -61,10 +64,69 @@ func (cmd *writePayloadCommand) getPolicy(ifc command) Policy {

func (cmd *writePayloadCommand) writeBuffer(ifc command) Error {
cmd.dataBuffer = cmd.payload
cmd.applyPolicy()
cmd.dataOffset = len(cmd.payload)
return nil
}

// Header write for write commands.
func (cmd *writePayloadCommand) applyPolicy() {
if cmd.ogPolicy == nil {
return
}

policy := cmd.ogPolicy

// Set flags.

generation := uint32(Buffer.BytesToInt32(cmd.dataBuffer, 14))
writeAttr := _INFO2_WRITE
readAttr := 0
infoAttr := 0

switch policy.RecordExistsAction {
case UPDATE:
case UPDATE_ONLY:
infoAttr |= _INFO3_UPDATE_ONLY
case REPLACE:
infoAttr |= _INFO3_CREATE_OR_REPLACE
case REPLACE_ONLY:
infoAttr |= _INFO3_REPLACE_ONLY
case CREATE_ONLY:
writeAttr |= _INFO2_CREATE_ONLY
}

switch policy.GenerationPolicy {
case NONE:
case EXPECT_GEN_EQUAL:
generation = policy.Generation
writeAttr |= _INFO2_GENERATION
case EXPECT_GEN_GT:
generation = policy.Generation
writeAttr |= _INFO2_GENERATION_GT
}

if policy.CommitLevel == COMMIT_MASTER {
infoAttr |= _INFO3_COMMIT_MASTER
}

if policy.DurableDelete {
writeAttr |= _INFO2_DURABLE_DELETE
}

// cmd.dataBuffer[8] = _MSG_REMAINING_HEADER_SIZE // Message header length.
cmd.dataBuffer[9] = byte(readAttr)
cmd.dataBuffer[10] = byte(writeAttr)
cmd.dataBuffer[11] = byte(infoAttr)
// cmd.dataBuffer[12] = byte(txnAttr)
cmd.dataBuffer[13] = 0 // clear the result code
cmd.dataOffset = 14
cmd.WriteUint32(generation)
if policy.Expiration != TTLDontUpdate {
cmd.WriteUint32(policy.Expiration)
}
}

func (cmd *writePayloadCommand) getNode(ifc command) (*Node, Error) {
return cmd.partition.GetNodeWrite(cmd.cluster)
}
Expand Down

0 comments on commit 485692f

Please sign in to comment.