Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4735] SubscriptionManager enhancement #4736

Open
wants to merge 4,462 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
4462 commits
Select commit Hold shift + click to select a range
8fa63b6
Update feature_request.yml (#4306)
qqeasonchen Jul 31, 2023
77b87f9
Update enhancement_request.yml (#4305)
qqeasonchen Jul 31, 2023
94dbdaa
[ISSUE 4309] Fix http process error (#4310)
Alonexc Aug 1, 2023
a48410c
Update documentation_related.yml (#4311)
qqeasonchen Aug 1, 2023
d19baa5
Merge branch 'apache:master' into developmentEM
devCod3r Aug 1, 2023
7fd6e3d
Update enhancement_request.yml (#4312)
qqeasonchen Aug 1, 2023
3592263
Update feature_request.yml (#4313)
qqeasonchen Aug 1, 2023
dd40227
[ISSUE #4171]support pulsar connector (#4186)
g0715158 Aug 1, 2023
dff6e7a
[ISSUE #4263] InterruptedExceptions should never be ignored[ThreadUti…
harshithasudhakar Aug 2, 2023
289bac6
[ISSUE #4266]InterruptedExceptions should never be ignored[connectors…
Ruhshan Aug 2, 2023
99bcf67
[ISSUE #4093]Repeat code extraction as method.[MessageUtils] (#4315)
Ruhshan Aug 2, 2023
4121332
modify the offset management service
xwm1992 Aug 2, 2023
fef56a5
[ISSUE #4260] Anonymous new ChannelFutureListener() can be replaced w…
sbmvirdi Aug 2, 2023
19dd2b1
[Feature #4032] Fix redis codec (#4301)
fabian4 Aug 3, 2023
939db45
modify the nacos config management service
xwm1992 Aug 4, 2023
8ab4000
modify the configs
xwm1992 Aug 4, 2023
800b628
[ISSUE #4264]Some optimizations for ProducerService (#4334)
847850277 Aug 5, 2023
4a12417
Merge branch 'master' of https://github.com/apache/eventmesh into add…
xwm1992 Aug 7, 2023
4f612f1
fix ci check error
xwm1992 Aug 7, 2023
4eddf36
fix ci check error
xwm1992 Aug 7, 2023
9621900
fix ci check error
xwm1992 Aug 7, 2023
9d67039
[ISSUE #4085] Switching order (#4338)
Kouzola Aug 7, 2023
68c8f23
[ISSUE #4262] Enhancement Request EventMeshCloudEventUtils (#4337)
maxim-zgardan Aug 8, 2023
50bbaec
Merge pull request #4329 from xwm1992/add-offsetStorage
wqliang Aug 8, 2023
97157f8
fix template not taking effect (#4325)
Alonexc Aug 8, 2023
8732dd3
Merge pull request #4298 from devCod3r/developmentEM
lrhkobe Aug 8, 2023
4568c50
Merge pull request #4284 from seriouszyx/yixiang/event-bridge
lrhkobe Aug 8, 2023
d3d9d4f
[ISSUE #4339][Unit Test] improve eventmesh-common protocol unit test …
847850277 Aug 10, 2023
475cb2c
feat: refactor with mongodb plugin.
fabian4 Aug 10, 2023
dec34f3
[ISSUE 4346] Fix start error and some code optimization. (#4347)
Alonexc Aug 11, 2023
8418745
[ISSUE 4350] Fixed eventmesh startup error under MQ plugin (#4351)
Alonexc Aug 11, 2023
a350ffc
[ISSUE #4349][Unit Test] eventmesh-common header unit test. (#4348)
847850277 Aug 11, 2023
dbfb571
[ISSUE #4354] Fix spelling mistake (#4353)
iwangjie Aug 13, 2023
85ee0fb
[ISSUE #4191] Do some code optimization.[StreamPushRequest](#4356)
himansh295 Aug 14, 2023
8be4180
[ISSUE #4345]Fix publish EventMeshMessage without requestCode throw j…
mxsm Aug 14, 2023
857ad6b
[ISSUE #4190] Code optimization for the EventMeshConsumer class.
yanrongzhen Aug 14, 2023
9264a75
[ISSUE #4196] [Enhancement] Dereference of 'subscriptionItems' may pr…
vaibhavarya1622 Aug 14, 2023
5494919
[ISSUE #4335] Fix Webhook connection not closing after processing (#4…
Pil0tXia Aug 14, 2023
dd6cc06
[ISSUE #4203] [Enhancement] Do some code optimization.[EventMeshTcpMe…
vaibhavarya1622 Aug 15, 2023
c29ada6
[ISSUE #4082][Task1] Support S3 file source connector (#4132)
TheR1sing3un Aug 15, 2023
ad7d808
[ISSUE #4363] Do some code optimization.[EventMeshConsumer] (#4362)
yanrongzhen Aug 15, 2023
fe3000c
[ISSUE 4360] Fix unSubscribeUrl cannot be null (#4361)
Alonexc Aug 15, 2023
c271cee
feat: refactor with mongodb plugin.
fabian4 Aug 15, 2023
8d627ec
feat: refactor with mongodb plugin.
fabian4 Aug 16, 2023
f1a01a2
Merge branch 'apache:master' into mongodb_plugin
fabian4 Aug 16, 2023
3bbffb3
feat: refactor with mongodb plugin.
fabian4 Aug 16, 2023
d96804a
feat: refactor with mongodb plugin.
fabian4 Aug 17, 2023
324045c
Merge pull request #4367 from fabian4/mongodb_plugin
wqliang Aug 17, 2023
8f584d5
Merge pull request #4068 from pandaapo/master-issue4067
qqeasonchen Aug 17, 2023
9cf176a
[ISSUE #4366] When listens for remote nacos offset configuration, Con…
xwm1992 Aug 17, 2023
8f13784
feat: refactor with rabbitmq plugin.
fabian4 Aug 17, 2023
182c509
[ISSUE #4366] When listens for remote nacos offset configuration, Con…
xwm1992 Aug 17, 2023
e03d291
[ISSUE #4366] When listens for remote nacos offset configuration, Con…
xwm1992 Aug 17, 2023
92eb957
[ISSUE #4366] When listens for remote nacos offset configuration, Con…
xwm1992 Aug 17, 2023
08b1915
[ISSUE #4379] Enable manually commit offset in rocketmq source connector
xwm1992 Aug 17, 2023
3b7855f
Merge pull request #4380 from xwm1992/fix-4379
wqliang Aug 17, 2023
32e11dd
feat: refactor with rabbitmq plugin.
fabian4 Aug 18, 2023
50d6113
[ISSUE #4062]Implement the method printRetryThreadPoolState() in Even…
pandaapo Aug 21, 2023
beb2e57
feat: refactor with rabbitmq plugin.
fabian4 Aug 22, 2023
3d4e922
[ISSUE #4025] Comparison using reference equality instead of value eq…
AbhishekPSingh07 Aug 22, 2023
403ae9c
feat: refactor with rabbitmq plugin.
fabian4 Aug 22, 2023
a5639bf
feat: refactor with rabbitmq plugin.
fabian4 Aug 22, 2023
3cd5670
Merge branch 'apache:master' into rabbitmq_plugin
fabian4 Aug 22, 2023
d7abc31
[ISSUE #4087] Anonymous new can be replaced with lambda (#4391)
kartiktayal Aug 23, 2023
1babec1
[ISSUE #4027] Comparison using reference equality instead of value eq…
harshithasudhakar Aug 23, 2023
d37bd9e
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
a53d6b3
Merge remote-tracking branch 'origin/rabbitmq_plugin' into rabbitmq_p…
fabian4 Aug 23, 2023
d7bd581
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
47d88a0
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
ac4c5cb
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
1b717a3
Merge branch 'apache:master' into rabbitmq_plugin
fabian4 Aug 23, 2023
ce4c8bb
Merge remote-tracking branch 'origin/rabbitmq_plugin' into rabbitmq_p…
fabian4 Aug 23, 2023
95ec173
[ISSUE #4388] Improve the rocketmq source connector offset ack (#4389)
xwm1992 Aug 23, 2023
3f1783f
feat: refactor with rabbitmq plugin.
fabian4 Aug 23, 2023
fd1102a
Merge pull request #4393 from fabian4/rabbitmq_plugin
qqeasonchen Aug 24, 2023
5ecbe7b
Update .asf.yaml
qqeasonchen Aug 24, 2023
738e98d
Merge pull request #4399 from apache/qqeasonchen-patch-19
harshithasudhakar Aug 24, 2023
51a1171
[ISSUE #4195] Do some code optimization.[SubscribeProcessor] (#4383)
gautamsagar99 Aug 25, 2023
fcedbbc
[ISSUE #4341] WebHookConfig failed to load after being inserted (#4344)
Pil0tXia Aug 25, 2023
ebd5b5e
[ISSUE 4042] EventMesh Integrated k8s with different components. (#4377)
Alonexc Aug 25, 2023
707c98d
[ISSUE #4390] Realize the SPI extension loading of RabbitMQ admin-api…
Pil0tXia Aug 26, 2023
3678c23
[ISSUE #4398] Use bash to excute shell script and fix mis-output (#4401)
Pil0tXia Aug 26, 2023
b55e6db
[ISSUE #4403] Fix the SPI extension admin-api loading of storage-plug…
Pil0tXia Aug 28, 2023
af53e7a
[ISSUE #549] Unify request handler code style of tcp and http protoco…
hhuang1231 Aug 28, 2023
1e59cdf
[ISSUE #4369] Move Pravega plugin into Connector from Storage plugin…
fabian4 Aug 28, 2023
643f059
[ISSUE #4129]Enhance the functionality of EventMeshExtensionFactory (…
mxsm Aug 29, 2023
eea1d01
Add connector jdbc interface (#4332)
mxsm Aug 29, 2023
fbbb81d
[ISSUE #4403] Fix redis admin spi (#4405)
fabian4 Aug 29, 2023
9e4a0c0
[ISSUE #4406]Optimize Body#buildBody method (#4407)
mxsm Aug 30, 2023
5d390d3
[ISSUE #4374] Redesign registry module into meta storage (#4418)
xwm1992 Aug 30, 2023
c25a89b
Update README.md (#4422)
qqeasonchen Aug 30, 2023
1644984
[ISSUE #4423] Fix the meta-file error in the meta module (#4424)
xwm1992 Aug 30, 2023
3f2fd1c
[ISSUE #4019][Enhancement] Code optimization.[SubStreamHandler] (#4056)
PickBas Aug 31, 2023
da02765
[ISSUE #4277] Add eventmesh-connector-knative (#4428)
fabian4 Sep 6, 2023
4d561b8
[ISSUE #4431]Support update subscription data for http/tcp/grpc proto…
xwm1992 Sep 7, 2023
ba694b4
remove duplicated moudle (#4429)
qqeasonchen Sep 7, 2023
fbf5c81
[ISSUE #4435] Fix license check error (#4436)
xwm1992 Sep 7, 2023
b83663c
update architecture picture (#4437)
xwm1992 Sep 8, 2023
6dfd46d
[ISSUE #4396] Realize the admin function for Kafka storage-plugin (#4…
Pil0tXia Sep 12, 2023
74f9abf
[ISSUE #4409]Support source connector jdbc handle snapshot data for m…
mxsm Sep 13, 2023
ff663e2
[ISSUE #4091]refactor: replace with lambda. use the correct log forma…
storv Sep 15, 2023
e66109c
[ISSUE #4193]Do some code optimization. (#4452)
yanrongzhen Sep 19, 2023
109c493
[ISSUE #4450] Add retry strategy for sourceWorker(#4451)
fabian4 Sep 19, 2023
08fabd2
[ISSUE #4060]Use AbstractRetryer to unify GrpcRetryer, HttpRetryer an…
pandaapo Sep 22, 2023
df07986
[ISSUE #4454] Support File Sink Connector (#4455)
xwm1992 Sep 22, 2023
f8d4023
[ISSUE #4448] Fix meta plugin dist startup failure and upgrade gradle…
Pil0tXia Sep 22, 2023
b4feffc
[ISSUE #4456] startup failure caused by Retryer
pandaapo Sep 25, 2023
a18ff1f
[ISSUE #4089] Anonymous new can be replaced with lambda.[CClientDemo]…
pandalee99 Sep 25, 2023
927bc5b
[ISSUE #4192] Do some code optimization.[StreamTopicConfig] (#4464)
pandalee99 Sep 26, 2023
ce15df9
[ISSUE #4439]Support source connector jdbc handle CDC for mysql (#4457)
mxsm Sep 26, 2023
73af459
[ISSUE #4443] Automatically format code at build time using the Spotl…
yanrongzhen Sep 30, 2023
05900d0
[ISSUE #3397] Make field a static final constant or non-public
Monika-Sivakumar-3 Oct 4, 2023
b6f9ca5
[ISSUE #4275] Support subscription info management with eventmesh-adm…
Pil0tXia Oct 6, 2023
c8b7916
[ISSUE #4472]Fix MeshMessageProtocolAdaptor#fromCloudEvent throw NPE …
mxsm Oct 7, 2023
7508251
[ISSUE #4478] Upgrade JUnit to JUnit Jupiter (#4475)
mureinik Oct 10, 2023
47203c7
[ISSUE #3516] Do some code optimization.[SubClientImpl] (#4480)
yanrongzhen Oct 11, 2023
d2ce6c9
[ISSUE #4024] Change to use equals() to compare object.[QueryOperatio…
yanrongzhen Oct 11, 2023
82e0039
[ISSUE #4026] Comparison using reference equality instead of value eq…
PickBas Oct 13, 2023
d327ff3
[ISSUE #4413] Add spring boot source connector. (#4477)
yanrongzhen Oct 14, 2023
1a35984
[ISSUE #4483] Standardize exception handling in tests (#4484)
mureinik Oct 14, 2023
01cec6e
[ISSUE #4090]replace with lambda. (#4487)
Ricky-Qu Oct 15, 2023
46f7c5b
Avoid unnecessary boxing by using plain == for primitive types (#4485)
Ricky-Qu Oct 15, 2023
093fb6b
[ISSUE #3520]Swap these 2 arguments so they are in the correct order…
Ricky-Qu Oct 15, 2023
e9e0080
[ISSUE #4467]Rust SDK supports the latest gRPC protocol (#4488)
mxsm Oct 17, 2023
53e36b5
[ISSUE #4489] Migrate eventmesh-admin module (#4490)
Pil0tXia Oct 17, 2023
f629730
[ISSUE #4414] Add spring sink connector. (#4491)
yanrongzhen Oct 17, 2023
d2531b2
[ISSUE #4474] Uniformly manage duplicate static variables (#4495)
VishalMCF Oct 20, 2023
1b3e496
[ISSUE#4496] Fix gradle dist error. (#4497)
yanrongzhen Oct 20, 2023
62935c4
[ISSUE #4494] RESTful API framework for EventMeshAdmin (#4498)
Pil0tXia Oct 20, 2023
d3f688d
[ISSUE #4415] Add Promethus source connector (#4493)
willimpo Oct 22, 2023
31b4e96
[ISSUE#4459] Fix unchecked call to a original type member (#4506)
yanrongzhen Oct 24, 2023
f046a2e
[ISSUE #4402] Refactor the retry module with HashedWheelTimer. (#4505)
yanrongzhen Oct 25, 2023
7634404
[ISSUE #4503]Rust sdk use nightly toolchain (#4504)
mxsm Oct 25, 2023
a717d35
[WIP][ISSUE #4043]initial implementation of filter and transform (#4365)
pmupkin Oct 25, 2023
cf16cf6
[ISSUE#4508] Add unit test for SpringSourceConnector. (#4509)
yanrongzhen Oct 25, 2023
ae76d75
[ISSUE#4510] Add test case for SpringSinkConnector. (#4511)
yanrongzhen Oct 26, 2023
f948cc5
[ISSUE #4513]Add eventmesh filter & transform sub module (#4514)
xwm1992 Oct 27, 2023
11def17
[ISSUE #4517]Fix Webhook callback path is not a valid Nacos dataId (#…
Pil0tXia Oct 28, 2023
a091017
[ISSUE#4515] Add test case for ProtocolPluginFactory. (#4516)
yanrongzhen Oct 30, 2023
55865a6
[ISSUE #4502] Optimizing Log Printing Using the LogUtils
scwlkq Oct 31, 2023
2bfc1ae
[ISSUE #4521] A poor naming. (#4524)
Pil0tXia Oct 31, 2023
80c7f88
[ISSUE #4319] C SDK Framework (#4323)
zhurq Nov 1, 2023
2f66f20
[ISSUE #4320] C SDK Support For TCP (#4324)
zhurq Nov 1, 2023
e13ac79
[ISSUE #4321] Add Examples And Configs For C SDK (#4326)
zhurq Nov 1, 2023
20155ed
[ISSUE #4322] C SDK Makefile (#4327)
zhurq Nov 1, 2023
bdacd23
[ISSUE #3906] Do some code optimization.[EventMeshUtil] (#4527)
tooo-bad Nov 1, 2023
4bda079
[ISSUE #3825] Do some code optimization.[TopicMetadata] (#4526)
tooo-bad Nov 1, 2023
40a2aea
[ISSUE #4519]Rust sdk support CloudEvents (#4520)
mxsm Nov 2, 2023
42d6e55
[ISSUE#4536] Add unit test for RocketMQSinkConnector. (#4538)
yanrongzhen Nov 4, 2023
360ed66
[ISSUE #3902] Do some code optimization.[NetUtils] (#4528)
tooo-bad Nov 4, 2023
261223f
[ISSUE#4531] Add unit test for RedisSinkConnector. (#4533)
yanrongzhen Nov 4, 2023
7438a55
[ISSUE #4529] Add sendcallback for springConnector. (#4532)
yanrongzhen Nov 6, 2023
0b6ae9f
[ISSUE #4549]Remove unnecessary transient modifiers. (#4550)
yanrongzhen Nov 7, 2023
fe7256f
[ISSUE#4535] Add unit test for RocketMQSourceConnector. (#4546)
yanrongzhen Nov 7, 2023
c350528
fix make libcurl.a error (#4542)
zhurq Nov 7, 2023
3a580dc
[MINOR] Fix ci check error (#4543)
xwm1992 Nov 9, 2023
a60d46f
Update SendAsyncMessageProcessor.java (#4054)
piyush280599 Nov 12, 2023
9cb1704
[ISSUE #3472]Do some code optimization[HttpProtocolAdaptor] (#3883)
harshithasudhakar Nov 12, 2023
9fdc30a
[ISSUE#4530] Add unit test for RedisSourceConnector. (#4545)
yanrongzhen Nov 12, 2023
013512b
[ISSUE#4552] Add example for spring connector sdk. (#4553)
yanrongzhen Nov 13, 2023
13f4d98
[ISSUE#4178] When there are a large number of errors in the HTTP targ…
yanrongzhen Nov 14, 2023
709b211
[ISSUE #913] Push docker image to apache repo (#4282)
kartiktayal Nov 15, 2023
fc900e5
[ISSUE #4411] Add DingDing sink connnector. (#4557)
yanrongzhen Nov 16, 2023
ebb5ba1
[ISSUE #4559]Update readme.md (#4560)
yanrongzhen Nov 17, 2023
715423c
[ISSUE #4201] optimize the code of metrics (#4563)
G-XD Nov 17, 2023
5a8cd37
[ISSUE #4420] Add Feishu Sink connector (#4522)
SunnyBoy-WYH Nov 19, 2023
ad4ecf3
[ISSUE#4412] Add WeCom sink connector. (#4558)
yanrongzhen Nov 19, 2023
afb3a66
[ISSUE#4419] Add slack sink connector. (#4562)
yanrongzhen Nov 20, 2023
56713fb
fix() : Resolve ambiguity in method names in AbstractTCPServer (#4571)
hhuang1231 Nov 20, 2023
cc36756
[ISSUE#4568] Support adding extensions from ConnectRcord to CloudEven…
hhuang1231 Nov 21, 2023
4c9f0e1
[ISSUE #4547] Add unit test for OpenFunctionSourceConnector. (#4561)
VishalMCF Nov 22, 2023
df288ab
[ISSUE #4564] add url validation to avoid SSRF (#4572)
wizardzhang Nov 23, 2023
007553a
refactor moudle dingding to dingtalk (#4574)
qqeasonchen Nov 23, 2023
4e72b3c
[ISSUE #3399] replace anonymous inner class with lambda (#4575)
wizardzhang Nov 24, 2023
6139147
[ISSUE #4581] http demo rely on tcp runtime (#4583)
hhuang1231 Nov 27, 2023
2fcd091
[ISSUE #4577] Implement FilterEngine for EventMesh Filters (#4578)
xwm1992 Nov 28, 2023
6508df8
[ISSUE#4580] Connector extension supports spring environment variable…
yanrongzhen Nov 28, 2023
97ba401
[ISSUE #4585] The first-interaction ci check error
xwm1992 Nov 30, 2023
5998926
There is no metaStorage plugin of type "namesrv"
VishalMCF Dec 1, 2023
d463fa7
[ISSUE #4410]add wechat sink connector (#4594)
wizardzhang Dec 3, 2023
2a46b82
[ISSUE #4596]Fix SourceWorker#convertRecordToEvent method converts Co…
mxsm Dec 3, 2023
34c803b
[ISSUE #4042] eventmesh operator (#4476)
Alonexc Dec 5, 2023
5d8e0f2
[ISSUE #4619]Fix Grpc request reply can not revice reply message (#4620)
mxsm Dec 7, 2023
4df4803
[ISSUE #4088] Anonymous new can be replaced with lambda.[BroadCastSub…
AXE-00 Dec 7, 2023
a170a79
[ISSUE #4590]WeCom/DingTalk connector extension supports spring envir…
yanrongzhen Dec 7, 2023
8c59c7f
[ISSUE #4621] Implement TransformerEngine for EventMesh Transformer (…
xwm1992 Dec 7, 2023
67b7a9f
[ISSUE #4623]Fix cloudEvents variable names do not support underscore…
mxsm Dec 8, 2023
ced061d
[ISSUE #4625]fix grpc request reply can not work (#4626)
mxsm Dec 8, 2023
ee9ee4c
[ISSUE #4602] when wechat send message api response errcode is not ze…
wizardzhang Dec 8, 2023
1e533d1
[ISSUE #4598] Improve eventmesh-connector-lark (#4599)
hhuang1231 Dec 10, 2023
24873dd
[ISSUE #4631]Optimize the message body of the Java SDK's returned rep…
mxsm Dec 10, 2023
958b041
[ISSUE #4096] InterruptedExceptions should never be ignored in the co…
HarshSawarkar Dec 11, 2023
a531968
[ISSUE #4618] Add HTTP source connector (#4634)
Fungx Dec 12, 2023
87ed40d
[ISSUE #4638] Optimize, translate and sync connetor documents with we…
Pil0tXia Dec 13, 2023
14c545c
[ISSUE #4647] Fix binary package runtime failure.
Alonexc Dec 14, 2023
6115009
[ISSUE #4643]Fix Grpc client can't reconnection when runtime after cr…
mxsm Dec 14, 2023
b85a58c
[ISSUE #4607] Documentation Needed for eventmesh-connector-rabbitmq (…
fabian4 Dec 14, 2023
a77a2fd
feat: Add doc for eventmesh-connector-mongodb (#4649)
fabian4 Dec 15, 2023
f17b82e
[ISSUE #4652]Fix EventMeshGrpcConsumer subscribe webhook send heartBe…
mxsm Dec 15, 2023
be8fcc9
[ISSUE #4656] Documentation Needed for eventmesh-connector-http
Fungx Dec 16, 2023
61e1392
[ISSUE #4658] Add documentation for dingtalk sink connector. (#4660)
yanrongzhen Dec 16, 2023
2fa7017
[ISSUE #4661] Add documentation for slack sink connector. (#4662)
yanrongzhen Dec 16, 2023
8d5b39b
[issue #4663]Add documentation for wecom sink connector. (#4664)
yanrongzhen Dec 16, 2023
d625790
[ISSUE #4659] Fix HTTP source connector stop after receiving an inval…
Fungx Dec 16, 2023
d66ebe6
[ISSUE #4665] Optimize and sync recent new connectors' docs (#4668)
Pil0tXia Dec 17, 2023
65fdabf
[ISSUE #4667] Fix some connectors have not been included in the packa…
mxsm Dec 17, 2023
03aa825
[ISSUE #4671]Update LICENSE and NOTICE (#4672)
mxsm Dec 18, 2023
2b2bf71
[ISSUE #4670] Support Java 11 StartUp and Docker deployment (#4675)
Pil0tXia Dec 19, 2023
e056d7a
[ISSUE #4630] Fix concurrency problem and split task handle threadpo…
lrhkobe Dec 19, 2023
2989941
[ISSUE #4677]netty codec enhancement (#4678)
karsonto Dec 22, 2023
1a381ed
[ISSUE #4683] Documentation Needed for eventmesh-connector-redis (#4684)
fabian4 Dec 22, 2023
fe7bf33
[ISSUE #4690] Fix Kafka Connector failed to start (#4691)
Pil0tXia Dec 28, 2023
bd2aeb9
[ISSUE #4688] Capture build scans on ge.apache.org to benefit from de…
clayburn Dec 28, 2023
3642be0
[ISSUE #4695] Fix tcp client not thread safe (#4696)
karsonto Jan 3, 2024
0be4c76
[ISSUE #1054] Build caching optimizations (#4689)
samotleriche Jan 3, 2024
e507497
[ISSUE #4712]Enhancement for NacosMetaService (#4713)
karsonto Jan 3, 2024
dbd96ed
[ISSUE #4714] Migrate eventmesh-admin module to eventmesh-dashboard repo
Pil0tXia Jan 4, 2024
196cd11
[ISSUE #4703]Update project version to 1.10.0-release (#4704)
mxsm Jan 4, 2024
b3f5375
[ISSUE #4548]Added tests for OpenFunctionSinkConnector (#4717)
VishalMCF Jan 4, 2024
8ebc984
[ISSUE #4723] Fix get meta from nacos return 403 (#4724)
karsonto Jan 5, 2024
c3326c0
[ISSUE #4721]Use gradle-build-action for caching and build scan link …
clayburn Jan 6, 2024
a43357c
[ISSUE #4686] Shell scripts should preserve LF line endings (#4687)
Pil0tXia Jan 6, 2024
8eb3d45
[ISSUE #4705] Remove redundant Gradle build task 'jar' (#4706)
Pil0tXia Jan 6, 2024
1ad5fdf
[ISSUE #4707]Update rust sdk toolchain (#4708)
mxsm Jan 8, 2024
a1fce2f
[ISSUE #4654]Get Ip address problem when run example code (#4655)
karsonto Jan 8, 2024
1f0bf0c
[ISSUE #4701]Fix use tcp protocol client send message, it throw a Dec…
mxsm Jan 10, 2024
5850d54
[ISSUE #4711] Properly close resources used by WatchFileManagerTest (…
tylerbertrand Jan 10, 2024
21731e8
[ISSUE #4728] Deploy eventmesh-operator on k8s cluster. (#4727)
Alonexc Jan 11, 2024
aae0d54
[ISSUE #4697] Use Fluent Logging API to provide accurate, concise, an…
Pil0tXia Jan 12, 2024
248be85
[ISSUE #4734] Use the static final modifier to decorate Logger (#4741)
MovieTone Jan 13, 2024
8c60e3e
[ISSUE #4737] Separate codeql workflow (#4740)
clayburn Jan 14, 2024
e837f34
[ISSUE #4710] Rewrite the quick start guide for the home page. (#4709)
Alonexc Jan 14, 2024
ad2a41c
[ISSUE #4694] Component initialization order adjustment, add resource…
Alonexc Jan 18, 2024
21e4c3a
[ISSUE #4635] Implemented the functions of file source connector (#4650)
HarshSawarkar Jan 20, 2024
39c8293
[ISSUE #4095] Code Optimization and Interrupeted Exception handling.[…
scwlkq Jan 20, 2024
1d30733
[ISSUE #4733] Substitute e.printStackTrace() with log.error() (#4754)
scwlkq Jan 21, 2024
506838e
[ISSUE #3822]Enhance thread handling of InterruptedException (#4756)
scwlkq Jan 23, 2024
73ab1d6
[ISSUE #4725] Update issue template and remove deprecated connector d…
Pil0tXia Jan 24, 2024
032ff80
[ISSUE #4633] Optimize the message body of the Rust SDK's returned re…
kyooosukedn Jan 26, 2024
8bb8b8f
[ISSUE #3903] Unwritten public or protected field [HeartbeatRequestBo…
AaronGhebretinsae Jan 26, 2024
378aa75
[ISSUE #3010] Method manually handles closing an auto-closeable resou…
arsenalzp Jan 26, 2024
28c746c
[ISSUE #4761] Update copyright year to 2024 (#4761) (#4762)
orol116 Feb 1, 2024
648f3e9
[ISSUE #4750] Replace sun.net.httpserver.HttpServer to use netty serv…
karsonto Feb 4, 2024
36888b8
Update README.md (#4772)
qqeasonchen Feb 18, 2024
d6393ab
[ISSUE #4458] Support MySQL Sink Connector feature (#4771)
mxsm Feb 18, 2024
9a3912a
[ISSUE #4731] HttpRequestProcessor enhancement (#4732)
karsonto Feb 18, 2024
363b5e0
resolve conflict
karsonto Feb 2, 2024
357e021
SubscriptionManager enhancement
karsonto Jan 10, 2024
fed8ab3
client enhancement
karsonto Jan 10, 2024
0d9e9d6
client enhancement
karsonto Jan 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
resolve conflict
karsonto committed Feb 19, 2024
commit 363b5e0cb6cd809aa5e07d82a28e73ff22be1847
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

import lombok.extern.slf4j.Slf4j;

@@ -56,6 +57,8 @@ public class SubscriptionManager {
*/
private final ConcurrentHashMap<String, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>(64);

private final ReentrantLock lock = new ReentrantLock();

public SubscriptionManager(boolean isEventMeshServerMetaStorageEnable, MetaStorage metaStorage) {
this.isEventMeshServerMetaStorageEnable = isEventMeshServerMetaStorageEnable;
this.metaStorage = metaStorage;
@@ -71,82 +74,87 @@ public ConcurrentHashMap<String, List<Client>> getLocalClientInfoMapping() {

public void registerClient(final ClientInfo clientInfo, final String consumerGroup,
final List<SubscriptionItem> subscriptionItems, final String url) {
for (final SubscriptionItem subscription : subscriptionItems) {
final String groupTopicKey = consumerGroup + "@" + subscription.getTopic();

List<Client> localClients = localClientInfoMapping.get(groupTopicKey);

if (localClients == null) {
localClientInfoMapping.putIfAbsent(groupTopicKey, new ArrayList<>());
localClients = localClientInfoMapping.get(groupTopicKey);
}

boolean isContains = false;
for (final Client localClient : localClients) {
// TODO: compare the whole Client would be better?
if (StringUtils.equals(localClient.getUrl(), url)) {
isContains = true;
localClient.setLastUpTime(new Date());
break;
lock.lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread-safety issue with the method is brought about by the global variable. Could you explain where there is a thread safety issue with the way the original method uses ConcurrentHashMap?

该方法的线程安全问题是由全局变量带来的。能否解释下原方法对ConcurrentHashMap的使用方式哪里存在线程安全问题?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个改动主要消除了SubscribeProcessor 使用 SubscriptionManager synchronized 方法,保证了SubscriptionManager方法调用是线程安全的,是线程安全的类

try {
for (final SubscriptionItem subscription : subscriptionItems) {
final String groupTopicKey = consumerGroup + "@" + subscription.getTopic();
List<Client> localClients = localClientInfoMapping.computeIfAbsent(groupTopicKey, (groupTopicKeyInner) ->
new ArrayList<>()
);
boolean isContains = false;
for (final Client localClient : localClients) {
// TODO: compare the whole Client would be better?
if (StringUtils.equals(localClient.getUrl(), url)) {
isContains = true;
localClient.setLastUpTime(new Date());
break;
}
}
}

if (!isContains) {
Client client = new Client();
client.setEnv(clientInfo.getEnv());
client.setIdc(clientInfo.getIdc());
client.setSys(clientInfo.getSys());
client.setIp(clientInfo.getIp());
client.setPid(clientInfo.getPid());
client.setConsumerGroup(consumerGroup);
client.setTopic(subscription.getTopic());
client.setUrl(url);
client.setLastUpTime(new Date());
localClients.add(client);
if (!isContains) {
Client client = new Client();
client.setEnv(clientInfo.getEnv());
client.setIdc(clientInfo.getIdc());
client.setSys(clientInfo.getSys());
client.setIp(clientInfo.getIp());
client.setPid(clientInfo.getPid());
client.setConsumerGroup(consumerGroup);
client.setTopic(subscription.getTopic());
client.setUrl(url);
client.setLastUpTime(new Date());
localClients.add(client);
}
}
} finally {
lock.unlock();
}
}

public void updateSubscription(ClientInfo clientInfo, String consumerGroup,
String url, List<SubscriptionItem> subscriptionList) {
for (final SubscriptionItem subscription : subscriptionList) {
final List<Client> groupTopicClients = localClientInfoMapping
.get(consumerGroup + "@" + subscription.getTopic());
lock.lock();
try {
for (final SubscriptionItem subscription : subscriptionList) {
final List<Client> groupTopicClients = localClientInfoMapping
.get(consumerGroup + "@" + subscription.getTopic());

if (CollectionUtils.isEmpty(groupTopicClients)) {
log.error("group {} topic {} clients is empty", consumerGroup, subscription);
}
if (CollectionUtils.isEmpty(groupTopicClients)) {
log.error("group {} topic {} clients is empty", consumerGroup, subscription);
}

ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
if (consumerGroupConf == null) {
// new subscription
ConsumerGroupConf prev = localConsumerGroupMapping.putIfAbsent(consumerGroup, new ConsumerGroupConf(consumerGroup));
if (prev == null) {
log.info("add new subscription, consumer group: {}", consumerGroup);
ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
if (consumerGroupConf == null) {
// new subscription
ConsumerGroupConf prev = localConsumerGroupMapping.putIfAbsent(consumerGroup, new ConsumerGroupConf(consumerGroup));
if (prev == null) {
log.info("add new subscription, consumer group: {}", consumerGroup);
}
consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
}
consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
}

ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf()
.get(subscription.getTopic());
if (consumerGroupTopicConf == null) {
consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topic) -> {
ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
newTopicConf.setConsumerGroup(consumerGroup);
newTopicConf.setTopic(topic);
newTopicConf.setSubscriptionItem(subscription);
log.info("add new {}", newTopicConf);
return newTopicConf;
});
consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf().get(subscription.getTopic());
}
ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf()
.get(subscription.getTopic());
if (consumerGroupTopicConf == null) {
consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topic) -> {
ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
newTopicConf.setConsumerGroup(consumerGroup);
newTopicConf.setTopic(topic);
newTopicConf.setSubscriptionItem(subscription);
log.info("add new {}", newTopicConf);
return newTopicConf;
});
consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf().get(subscription.getTopic());
}

consumerGroupTopicConf.getUrls().add(url);
if (!consumerGroupTopicConf.getIdcUrls().containsKey(clientInfo.getIdc())) {
consumerGroupTopicConf.getIdcUrls().putIfAbsent(clientInfo.getIdc(), new ArrayList<>());
consumerGroupTopicConf.getUrls().add(url);
if (!consumerGroupTopicConf.getIdcUrls().containsKey(clientInfo.getIdc())) {
consumerGroupTopicConf.getIdcUrls().putIfAbsent(clientInfo.getIdc(), new ArrayList<>());
}
// TODO: idcUrl list is not thread-safe
consumerGroupTopicConf.getIdcUrls().get(clientInfo.getIdc()).add(url);
}
// TODO: idcUrl list is not thread-safe
consumerGroupTopicConf.getIdcUrls().get(clientInfo.getIdc()).add(url);
} finally {
lock.unlock();
}
}

Original file line number Diff line number Diff line change
@@ -155,33 +155,32 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
return;
}

synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
ClientInfo clientInfo = getClientInfo(requestWrapper);
SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
subscriptionManager.registerClient(clientInfo, consumerGroup, subscriptionList, url);
subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subscriptionList);

final long startTime = System.currentTimeMillis();
try {
// subscription relationship change notification
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());

handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);

} catch (Exception e) {
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}",
System.currentTimeMillis() - startTime, JsonUtils.toJSONString(subscriptionList), url, e);

handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null);
}
ClientInfo clientInfo = getClientInfo(requestWrapper);
SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
subscriptionManager.registerClient(clientInfo, consumerGroup, subscriptionList, url);
subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subscriptionList);

final long startTime = System.currentTimeMillis();
try {
// subscription relationship change notification
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());

// Update service metadata
eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);

} catch (Exception e) {
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}",
System.currentTimeMillis() - startTime, JsonUtils.toJSONString(subscriptionList), url, e);

handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, responseHeaderMap, responseBodyMap, null);
}

// Update service metadata
eventMeshHTTPServer.getSubscriptionManager().updateMetaData();


}

@Override
@@ -192,6 +191,7 @@ public String[] paths() {
@Override
public Executor executor() {
return eventMeshHTTPServer.getHttpThreadPoolGroup().getClientManageExecutor();

}

private ClientInfo getClientInfo(final HttpEventWrapper requestWrapper) {
Original file line number Diff line number Diff line change
@@ -155,46 +155,44 @@ public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<H
}

SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
synchronized (subscriptionManager.getLocalClientInfoMapping()) {
ClientInfo clientInfo = getClientInfo(subscribeRequestHeader);
subscriptionManager.registerClient(clientInfo, consumerGroup, subTopicList, url);
subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subTopicList);

final long startTime = System.currentTimeMillis();
HttpSummaryMetrics summaryMetrics = eventMeshHTTPServer.getMetrics().getSummaryMetrics();
try {
// subscription relationship change notification
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
subscriptionManager.getLocalConsumerGroupMapping().get(consumerGroup));

final CompleteHandler<HttpCommand> handler = httpCommand -> {
try {
log.debug("{}", httpCommand);
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());

summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
} catch (Exception ex) {
log.error("onResponse error", ex);
}
};

responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
completeResponse(request, asyncContext, subscribeResponseHeader,
EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR,
EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2),
SubscribeResponseBody.class);
final long endTime = System.currentTimeMillis();

log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, JsonUtils.toJSONString(subscribeRequestBody.getTopics()), subscribeRequestBody.getUrl(), e);
ClientInfo clientInfo = getClientInfo(subscribeRequestHeader);
subscriptionManager.registerClient(clientInfo, consumerGroup, subTopicList, url);
subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subTopicList);

final long startTime = System.currentTimeMillis();
HttpSummaryMetrics summaryMetrics = eventMeshHTTPServer.getMetrics().getSummaryMetrics();
try {
// subscription relationship change notification
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
subscriptionManager.getLocalConsumerGroupMapping().get(consumerGroup));

final CompleteHandler<HttpCommand> handler = httpCommand -> {
try {
log.debug("{}", httpCommand);
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());

summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
} catch (Exception ex) {
log.error("onResponse error", ex);
}
};

responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
completeResponse(request, asyncContext, subscribeResponseHeader,
EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR,
EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2),
SubscribeResponseBody.class);
final long endTime = System.currentTimeMillis();
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, JsonUtils.toJSONString(subscribeRequestBody.getTopics()), subscribeRequestBody.getUrl(), e);
summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);

summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);
}
eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
}
eventMeshHTTPServer.getSubscriptionManager().updateMetaData();

}

@Override