Skip to content

Commit

Permalink
Optimizing the scope of RPC base classes (apache#15946)
Browse files Browse the repository at this point in the history
* Optimizing the scope of RPC base classes

* Fix UT
  • Loading branch information
ruanwenjun authored May 8, 2024
1 parent d218b02 commit 99d8276
Show file tree
Hide file tree
Showing 28 changed files with 301 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.alert.rpc;

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;

Expand All @@ -31,20 +30,7 @@
public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable {

public AlertRpcServer(AlertConfig alertConfig) {
super(NettyRemotingServerFactory.buildNettyRemotingServer(
NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build()));
super(NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build());
}

public void start() {
log.info("Starting AlertRpcServer...");
nettyRemotingServer.start();
log.info("Started AlertRpcServer...");
}

@Override
public void close() {
log.info("Closing AlertRpcServer...");
nettyRemotingServer.close();
log.info("Closed AlertRpcServer...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,24 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.processor;
package org.apache.dolphinscheduler.alert.rpc;

import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.alert.config.AlertConfig;

import org.mockito.Mockito;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.junit.jupiter.api.Test;

/**
* dependency config
*/
@Configuration
public class TaskResponseProcessorTestConfig {
class AlertRpcServerTest {

private final AlertRpcServer alertRpcServer = new AlertRpcServer(new AlertConfig());

@Bean
public DataQualityResultOperator dataQualityResultOperator() {
return Mockito.mock(DataQualityResultOperator.class);
@Test
void testStart() {
alertRpcServer.start();
}

@Test
void testClose() {
alertRpcServer.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.NettyRemotingServer;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery;
import org.apache.dolphinscheduler.extract.common.ILogService;
Expand Down Expand Up @@ -91,7 +90,7 @@ public class LoggerServiceTest {
@Mock
private TaskDefinitionMapper taskDefinitionMapper;

private NettyRemotingServer nettyRemotingServer;
private SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery;

private int nettyServerPort = 18080;

Expand All @@ -103,11 +102,10 @@ public void setUp() {
return;
}

nettyRemotingServer = new NettyRemotingServer(NettyServerConfig.builder().listenPort(nettyServerPort).build());
nettyRemotingServer.start();
SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery =
new SpringServerMethodInvokerDiscovery(nettyRemotingServer);
springServerMethodInvokerDiscovery.postProcessAfterInitialization(new ILogService() {
springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(
NettyServerConfig.builder().serverName("TestLogServer").listenPort(nettyServerPort).build());
springServerMethodInvokerDiscovery.start();
springServerMethodInvokerDiscovery.registerServerMethodInvokerProvider(new ILogService() {

@Override
public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest) {
Expand Down Expand Up @@ -142,13 +140,14 @@ public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {

}
}, "iLogServiceImpl");
});
springServerMethodInvokerDiscovery.start();
}

@AfterEach
public void tearDown() {
if (nettyRemotingServer != null) {
nettyRemotingServer.close();
if (springServerMethodInvokerDiscovery != null) {
springServerMethodInvokerDiscovery.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.dolphinscheduler.extract.base.client;

import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.utils.Host;

import java.lang.reflect.Method;

public abstract class BaseRemoteMethodInvoker implements ClientMethodInvoker {
abstract class AbstractClientMethodInvoker implements ClientMethodInvoker {

protected final String methodIdentifier;

Expand All @@ -32,7 +31,7 @@ public abstract class BaseRemoteMethodInvoker implements ClientMethodInvoker {

protected final Host serverHost;

public BaseRemoteMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) {
AbstractClientMethodInvoker(Host serverHost, Method localMethod, NettyRemotingClient nettyRemotingClient) {
this.serverHost = serverHost;
this.localMethod = localMethod;
this.nettyRemotingClient = nettyRemotingClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.utils.Host;

Expand All @@ -31,15 +30,15 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ClientInvocationHandler implements InvocationHandler {
class ClientInvocationHandler implements InvocationHandler {

private final NettyRemotingClient nettyRemotingClient;

private final Map<String, ClientMethodInvoker> methodInvokerMap;

private final Host serverHost;

public ClientInvocationHandler(Host serverHost, NettyRemotingClient nettyRemotingClient) {
ClientInvocationHandler(Host serverHost, NettyRemotingClient nettyRemotingClient) {
this.serverHost = checkNotNull(serverHost);
this.nettyRemotingClient = checkNotNull(nettyRemotingClient);
this.methodInvokerMap = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.lang.reflect.Method;

public interface ClientMethodInvoker {
interface ClientMethodInvoker {

Object invoke(Object proxy, Method method, Object[] args) throws Throwable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.dolphinscheduler.extract.base.client;

public interface IRpcClientProxyFactory {
interface IRpcClientProxyFactory {

/**
* Create the client proxy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.extract.base.client;

import org.apache.dolphinscheduler.extract.base.NettyRemotingClient;
import org.apache.dolphinscheduler.extract.base.utils.Host;

import java.lang.reflect.Proxy;
Expand All @@ -34,7 +33,7 @@
/**
* This class is used to create a proxy client which will transform local method invocation to remove invocation.
*/
public class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory {
class JdkDynamicRpcClientProxyFactory implements IRpcClientProxyFactory {

private final NettyRemotingClient nettyRemotingClient;

Expand All @@ -49,7 +48,7 @@ public Map<String, Object> load(String key) {
}
});

public JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) {
JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) {
this.nettyRemotingClient = nettyRemotingClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.base;
package org.apache.dolphinscheduler.extract.base.client;

import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer;
import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;

import java.util.concurrent.ExecutorService;

import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
Expand All @@ -38,11 +37,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {

private final NettyRemotingClient nettyRemotingClient;

private final ExecutorService callbackExecutor;

public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
public NettyClientHandler(NettyRemotingClient nettyRemotingClient) {
this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor;
}

@Override
Expand All @@ -64,13 +60,7 @@ private void processReceived(final Transporter transporter) {
}
StandardRpcResponse deserialize = JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class);
future.setIRpcResponse(deserialize);
future.release();
if (future.getInvokeCallback() != null) {
future.removeFuture();
this.callbackExecutor.execute(future::executeInvokeCallback);
} else {
future.putResponse(deserialize);
}
future.putResponse(deserialize);
}

@Override
Expand Down
Loading

0 comments on commit 99d8276

Please sign in to comment.