Skip to content

Commit

Permalink
bugfix: fix Alibaba Dubbo convert error (#6675)
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly authored Jul 13, 2024
1 parent 00fdb54 commit f8dfd1f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.dubbo.alibaba;

import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;

import org.apache.seata.core.constants.DubboConstants;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.BranchType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type Alibaba dubbo transaction consumer filter.
*/
@Activate(group = {DubboConstants.CONSUMER}, order = 100)
public class AlibabaDubboTransactionConsumerFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(AlibabaDubboTransactionConsumerFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!DubboConstants.ALIBABADUBBO) {
return invoker.invoke(invocation);
}
String xid = RootContext.getXID();
BranchType branchType = RootContext.getBranchType();

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("consumer xid in RootContext[{}], branchType in RootContext[{}]", xid, branchType);
}
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.getContext().removeAttachment(RootContext.KEY_XID);
RpcContext.getContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,51 +23,41 @@
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;

import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.constants.DubboConstants;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.BranchType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type Transaction propagation filter.
*
* The type Alibaba dubbo transaction provider filter.
*/
@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100)
public class AlibabaDubboTransactionPropagationFilter implements Filter {
@Activate(group = {DubboConstants.PROVIDER}, order = 100)
public class AlibabaDubboTransactionProviderFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(AlibabaDubboTransactionPropagationFilter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AlibabaDubboTransactionProviderFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!DubboConstants.ALIBABADUBBO) {
return invoker.invoke(invocation);
}
String xid = RootContext.getXID();
BranchType branchType = RootContext.getBranchType();

String rpcXid = getRpcXid();
String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
LOGGER.debug("xid in RpcContext[{}], branchType in RpcContext[{}]", rpcXid, rpcBranchType);
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);
}
if (rpcXid != null) {
RootContext.bind(rpcXid);
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
}

try {
return invoker.invoke(invocation);
} finally {
Expand All @@ -82,7 +72,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid,
rpcBranchType != null ? rpcBranchType : BranchType.AT,previousBranchType);
rpcBranchType != null ? rpcBranchType : BranchType.AT, previousBranchType);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);
Expand All @@ -93,15 +83,14 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
}
}
}
RpcContext.getContext().removeAttachment(RootContext.KEY_XID);
RpcContext.getContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);
RpcContext.getServerContext().removeAttachment(RootContext.KEY_XID);
RpcContext.getServerContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);
}
}

/**
* get rpc xid
*
* @return
*/
private String getRpcXid() {
Expand All @@ -111,5 +100,4 @@ private String getRpcXid() {
}
return rpcXid;
}

}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.apache.seata.integration.dubbo.alibaba.AlibabaDubboTransactionPropagationFilter
org.apache.seata.integration.dubbo.alibaba.AlibabaDubboTransactionConsumerFilter
org.apache.seata.integration.dubbo.alibaba.AlibabaDubboTransactionProviderFilter
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,26 @@
package org.apache.seata.integration.dubbo.alibaba;

import com.alibaba.dubbo.rpc.RpcContext;

import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.integration.dubbo.alibaba.mock.MockInvoker;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;


public class AlibabaDubboTransactionPropagationFilterTest {

private static final String DEFAULT_XID = "1234567890";

@Test
public void testInvoke_And_RootContext() {
AlibabaDubboTransactionPropagationFilter filter = new AlibabaDubboTransactionPropagationFilter();
AlibabaDubboTransactionProviderFilter providerFilter = new AlibabaDubboTransactionProviderFilter();

// SAGA
RpcContext.getContext().setAttachment(RootContext.KEY_XID, DEFAULT_XID);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, BranchType.SAGA.name());
filter.invoke(new MockInvoker(() -> {
providerFilter.invoke(new MockInvoker(() -> {
assertThat(RootContext.getXID()).isEqualTo(DEFAULT_XID);
assertThat(RootContext.getBranchType()).isEqualTo(BranchType.AT);
}), null);
Expand All @@ -46,19 +46,20 @@ public void testInvoke_And_RootContext() {
// TCC
RpcContext.getContext().setAttachment(RootContext.KEY_XID, DEFAULT_XID);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, BranchType.TCC.name());
filter.invoke(new MockInvoker(() -> {
providerFilter.invoke(new MockInvoker(() -> {
assertThat(RootContext.getXID()).isEqualTo(DEFAULT_XID);
assertThat(RootContext.getBranchType()).isEqualTo(BranchType.TCC);
}), null);
assertThat(RootContext.unbind()).isNull();
assertThat(RootContext.unbindBranchType()).isNull();

// TCC
AlibabaDubboTransactionConsumerFilter consumerFilter = new AlibabaDubboTransactionConsumerFilter();
RootContext.bind(DEFAULT_XID);
RootContext.bindBranchType(BranchType.SAGA);
RpcContext.getContext().setAttachment(RootContext.KEY_XID, DEFAULT_XID);
RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, BranchType.TCC.name());
filter.invoke(new MockInvoker(() -> {
consumerFilter.invoke(new MockInvoker(() -> {
assertThat(RootContext.getXID()).isEqualTo(DEFAULT_XID);
assertThat(RootContext.getBranchType()).isEqualTo(BranchType.SAGA);
}), null);
Expand Down

0 comments on commit f8dfd1f

Please sign in to comment.