Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]In the streaming mode of grpc, docking with jwt failed #3405

Open
zzwl0219 opened this issue Aug 23, 2024 · 6 comments
Open

[Question]In the streaming mode of grpc, docking with jwt failed #3405

zzwl0219 opened this issue Aug 23, 2024 · 6 comments
Labels
question Further information is requested

Comments

@zzwl0219
Copy link

在grpc的流模式下,发送jwt,服务端无法从context中获取对应的metadata

客户端代码
1、启动grpc连接
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithMiddleware(
metadata.Client(),
),
grpc.WithEndpoint(net.JoinHostPort(global.BootConfig.Node.ServerHost, global.BootConfig.Node.ServerPort)),
//流式grpc增加中间处理流程,普通grpc无效
grpc.WithStreamInterceptor(JwtStreamClientInterceptor(global.BootConfig.Node.NodeId, global.BootConfig.Node.JwtSecret)),
)
if err != nil {
global.LogUtil.Error(global.LogErrCode, err)
return err
}
c.client = serverPB.NewNodeServerClient(conn)
stream, err := c.client.MessageChannel(ctx)

2、过滤器开发
func JwtStreamClientInterceptor(nodeId, tokenSecret string) grpc.StreamClientInterceptor {
return func(ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
// 将 JWT 添加到元数据中
ips, _ := utils.GetLocalIPs()
macs, _ := utils.GetMacAddresses()
agentInfo := dto.AgentInfo{
LocalIp: strings.Join(ips, ""),
Mac: strings.Join(macs, ","),
Id: nodeId,
}
token, err := tokenEncode(agentInfo, tokenSecret)
if err != nil {
global.LogUtil.Error(global.LogErrCode, err)
return nil, err
}
// 使用生成的token替换硬编码的字符串
md := metadata.Pairs("x-md-global-authorization", fmt.Sprintf("Bearer %s", token))

	// 打印md以确认是否包含正确的授权头
	fmt.Printf("Metadata: %v\n", md)
	ctxNew := metadata.NewOutgoingContext(ctx, md)
	return streamer(ctxNew, desc, cc, method, opts...)
}

}

3、服务端启动服务
func NewGRPCServer(c *conf.Bootstrap, nodeService *service.NodeService, logger log.Logger) *grpc.Server {
var opts = []grpc.ServerOption{
grpc.Middleware(
//middlewares.JwtMiddleware(c.Node.JwtSecret),
metadata.Server(),
recovery.Recovery(),
),
grpc.StreamInterceptor(middlewares.JwtStreamInterceptor(c.Node.JwtSecret)),
}
if c.Server.Grpc.Network != "" {
opts = append(opts, grpc.Network(c.Server.Grpc.Network))
}
if c.Server.Grpc.Addr != "" {
opts = append(opts, grpc.Address(c.Server.Grpc.Addr))
}
if c.Server.Grpc.Timeout != nil {
opts = append(opts, grpc.Timeout(c.Server.Grpc.Timeout.AsDuration()))
}
//opts = append(opts, grpc.Middleware())
srv := grpc.NewServer(opts...)
v1.RegisterNodeServerServer(srv, nodeService)
return srv
}
4、服务端过滤器
func JwtStreamInterceptor(jwtSecret string) grpc.StreamServerInterceptor {
return func(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
ctx := stream.Context()
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Errorf(codes.InvalidArgument, "missing metadata")
}
j, _ := json.MarshalIndent(md, "", " ")
fmt.Println(string(j))

	return handler(srv, &wrappedServerStream{stream, ctxNew})
}

}
运行起来服务端打印的metadata 是
{
":authority": [
"127.0.0.1:9091"
],
"content-type": [
"application/grpc"
],
"user-agent": [
"grpc-go/1.65.0"
]
}
没有我在客户端添加的数据,如果常规的grpc,是可以成功的,请问这是什么原因

@zzwl0219 zzwl0219 added the question Further information is requested label Aug 23, 2024
@kratos-ci-bot kratos-ci-bot changed the title [Question]grpc的流模式下,对接jwt失败 [Question]In the streaming mode of grpc, docking with jwt failed Aug 23, 2024
@shenqidebaozi
Copy link
Member

Have you ever tried testing directly with gRPC instead of Kratos gRPC.

@zzwl0219
Copy link
Author

Have you ever tried testing directly with gRPC instead of Kratos gRPC.

I didn't test grpc, just use kratos grpc.

@zzwl0219
Copy link
Author

我用原生grpc去实现流模式下的jwt传递参数,服务端可以通过metadata接受到jwt信息
{
":authority": [
"localhost:9021"
],
"content-type": [
"application/grpc"
],
"user-agent": [
"grpc-go/1.65.0"
],
"x-md-global-authorization": [
"Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjoiTW1adGVMVHIwZmlqLyt6Mzdpd3BGbnNFUWV6cTdJV3MrZXYwOS9hMiIsImV4cCI6MTcyNDgxMTY2OX0.Su9FpFMu6kPFi1D9ItCWQil-96PvPjYWxxhn2eraDgs"
]
}
所以我现在不确定kratos为什么接受不到

@zzwl0219
Copy link
Author

这是我这边的代码,服务端接受不到metadata,代码中包含server和client
server.zip

@kratos-ci-bot
Copy link
Collaborator

Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑‍🤝‍🧑👫🧑🏿‍🤝‍🧑🏻👩🏾‍🤝‍👨🏿👬🏿


This is my code. The server cannot accept metadata. The code contains server and client.
server.zip

@clement2026
Copy link

@zzwl0219
This should fix the issue:

func isHealthCheck(method string) bool {
	return strings.HasPrefix(method, "/grpc.health.v1.Health/")
}

func JwtStreamInterceptor(jwtSecret string) grpc.StreamServerInterceptor {
	return func(
		srv interface{},
		stream grpc.ServerStream,
		info *grpc.StreamServerInfo,
		handler grpc.StreamHandler,
	) error {
                 // HealthCheck requests don't include the "x-md-global-authorization" header
		if isHealthCheck(info.FullMethod) {
			return handler(srv, stream)
		}
		ctx := stream.Context()
		ctxNew, err := JwtParse(ctx, jwtSecret)
		if err != nil {
			return err
		}
		return handler(srv, &wrappedServerStream{stream, ctxNew})
	}
}

Explanation

Kratos sets up a healthcheck server when it starts:

if !srv.customHealth {
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
}

When a grpc.ClientConn is created, it initiates a watch stream like any other stream. This stream goes through your JwtStreamInterceptor without the "x-md-global-authorization" header, causing an error that closes the grpc.ClientConn. This happens before the cc.SayHello(context.Background()) request is handled by the server.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants