Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于运行基于cache的ECDH_OPRF_UB_PSI时遇到的问题 #134

Closed
winnylyc opened this issue May 23, 2024 · 13 comments
Closed

关于运行基于cache的ECDH_OPRF_UB_PSI时遇到的问题 #134

winnylyc opened this issue May 23, 2024 · 13 comments
Assignees

Comments

@winnylyc
Copy link

您好,打扰您了。
我在尝试实现基于cache的ECDH_OPRF_UB_PSI遇到了问题。
我的方式是使用ECDH_OPRF_UB_PSI_2PC_GEN_CACHE来作为offline阶段,然后使用ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE作为online阶段。目前在运行ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE时产生报错。下面时两方运行的代码以及产生的错误。
Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:'/root/project/psi1/alice_exactpsi_1e6_unique.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv'}, 
    output_path={alice:'/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path='preprocess_cache',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete offline phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:'/root/project/psi1/alice_exactpsi_1e6_unique.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv'}, 
    output_path={alice:'/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path='preprocess_cache',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 15:49:58,963 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 15:49:59.719 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-23 15:50:00.424 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=19280) 2024-05-23 15:50:00.419 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=19280) 2024-05-23 15:50:00.422 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 15:50:01.120 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 15:50:01.121 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-23 15:50:04.124 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.337] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi1/alice_exactpsi_1e6_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576}
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.337] [info] [bucket_psi.cc:400] bucket size set to 1048576
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.337] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.338] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi1/alice_exactpsi_1e6_unique.csv
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.338] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.338] [info] [ecdh_oprf_selector.cc:76] use fourq
Traceback (most recent call last):
  File "/root/project/psi1/sf_connect_ub.py", line 60, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.psi_csv() (pid=19752, ip=192.168.15.7, actor_id=8c7c48f35d3db7ae1ba590c601000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-23 15:50:08.424 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(85748392bcd969ccc2dc0ecdcc67afbe6255b5ff0100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=19322, ip=192.168.15.7, actor_id=c2dc0ecdcc67afbe6255b5ff01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fb88c2b5ed0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=19752, ip=192.168.15.7, actor_id=8c7c48f35d3db7ae1ba590c601000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc,upstream_seq_id: 8#0, downstream_seq_id: 10.
2024-05-23 15:50:08.424 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error std::bad_alloc to bob.
2024-05-23 15:50:08.425 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 15:50:08.425 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 15:50:08.426 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 15:50:08.426 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=19322, ip=192.168.15.7, actor_id=c2dc0ecdcc67afbe6255b5ff01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fb88c2b5ed0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=19752, ip=192.168.15.7, actor_id=8c7c48f35d3db7ae1ba590c601000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-23 15:50:08.426 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-23 15:50:08.427 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 15:50:08.427 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 15:50:08.427 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 15:50:08.427 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:'/root/project/psi1/alice_exactpsi_1e6_unique.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv'}, 
    output_path={alice:'/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path='preprocess_cache',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete offline phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:'/root/project/psi1/alice_exactpsi_1e6_unique.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv'}, 
    output_path={alice:'/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path='preprocess_cache',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 15:50:01,526 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 15:50:02.104 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-23 15:50:02.743 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=19639) 2024-05-23 15:50:02.740 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=19639) 2024-05-23 15:50:02.742 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 15:50:03.339 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 15:50:03.339 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.320] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_GEN_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"ecdh_secret_key_path":"/root/project/psi1/alice_oprf_key"}
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.320] [info] [bucket_psi.cc:425] Run psi protocol=7, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [ecdh_oprf_selector.cc:33] use fourq
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [batch_provider.cc:328] ReadAndShuffle start, idx:0, provider_batch_size:1048576
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [batch_provider.cc:350] ReadAndShuffle end, idx:0 , size:100
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [ecdh_oprf_psi.cc:108] omp_get_num_threads:1 cpus:8
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.325] [info] [batch_provider.cc:318] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.325] [info] [batch_provider.cc:240] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.327] [info] [ecdh_oprf_psi.cc:192] FullEvaluate finished, batch_count=1 items_count=100
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.331] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"preprocess_path":"preprocess_cache"}
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.331] [info] [bucket_psi.cc:400] bucket size set to 1048576
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.337] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.337] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.337] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.337] [info] [bucket_ub_psi.cc:186] Start Sync
2024-05-23 15:50:08.430 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_ub.py", line 60, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=19639, ip=192.168.15.7, actor_id=d23873c80f4140328261fbb701000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7ff0b8729c90>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice

感谢您的帮助!

@6fj
Copy link
Member

6fj commented May 23, 2024

hi @winnylyc

我看到了 'MemoryError: std::bad_alloc'的报错,请问你现在的配置是否满足单边至少8c16g的最低要求?

@winnylyc
Copy link
Author

您好,非常感谢您的回答。
上面的问题产生的环境确实不满足这个最低要求,我是两方都在同一个8c32g的WSL环境中运行。
但是我尝试了换成两方都在同一个16c32g的WSL环境中运行,依然会报和上面一样的问题
Sender:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 16:22:09,571 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 16:22:10.316 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-23 16:22:10.992 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1237) 2024-05-23 16:22:10.986 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1237) 2024-05-23 16:22:10.990 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 16:22:11.629 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 16:22:11.629 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-23 16:22:14.633 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
2024-05-23 16:22:17.636 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 2 attemp, up to 3600 attemps.
2024-05-23 16:22:20.638 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 3 attemp, up to 3600 attemps.
2024-05-23 16:22:23.641 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 4 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Traceback (most recent call last):
  File "/root/project/psi1/sf_connect_ub.py", line 60, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.psi_csv() (pid=2078, ip=192.168.15.7, actor_id=dece172e6245582cd500f73501000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.792] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi1/alice_exactpsi_1e6_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576}
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.792] [info] [bucket_psi.cc:400] bucket size set to 1048576
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.793] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.794] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi1/alice_exactpsi_1e6_unique.csv
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.794] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.794] [info] [ecdh_oprf_selector.cc:76] use fourq
2024-05-23 16:22:28.875 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(71b133a11e1c461c696f04c5cb75ea847a480d960100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=1290, ip=192.168.15.7, actor_id=696f04c5cb75ea847a480d9601000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f06f2b7dfc0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=2078, ip=192.168.15.7, actor_id=dece172e6245582cd500f73501000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc,upstream_seq_id: 8#0, downstream_seq_id: 10.
2024-05-23 16:22:28.875 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error std::bad_alloc to bob.
2024-05-23 16:22:28.876 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 16:22:28.876 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 16:22:28.876 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 16:22:28.876 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=1290, ip=192.168.15.7, actor_id=696f04c5cb75ea847a480d9601000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f06f2b7dfc0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=2078, ip=192.168.15.7, actor_id=dece172e6245582cd500f73501000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-23 16:22:28.877 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-23 16:22:28.878 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 16:22:28.878 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 16:22:28.878 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 16:22:28.878 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

Receiver:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 16:22:16,732 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 16:22:17.484 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-23 16:22:18.110 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1911) 2024-05-23 16:22:18.106 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1911) 2024-05-23 16:22:18.109 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 16:22:18.743 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 16:22:18.743 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.770] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_GEN_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"ecdh_secret_key_path":"/root/project/psi1/alice_oprf_key"}
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.770] [info] [bucket_psi.cc:425] Run psi protocol=7, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.778] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.778] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.778] [info] [ecdh_oprf_selector.cc:33] use fourq
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.779] [info] [batch_provider.cc:328] ReadAndShuffle start, idx:0, provider_batch_size:1048576
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.780] [info] [batch_provider.cc:350] ReadAndShuffle end, idx:0 , size:100
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.781] [info] [ecdh_oprf_psi.cc:108] omp_get_num_threads:1 cpus:16
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.781] [info] [ecdh_oprf_psi.cc:119] tid:0 omp_get_num_threads:16
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.781] [info] [batch_provider.cc:318] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.781] [info] [batch_provider.cc:240] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.783] [info] [ecdh_oprf_psi.cc:192] FullEvaluate finished, batch_count=1 items_count=100
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.786] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"preprocess_path":"preprocess_cache"}
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.786] [info] [bucket_psi.cc:400] bucket size set to 1048576
2024-05-23 16:22:28.882 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_ub.py", line 60, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=1911, ip=192.168.15.7, actor_id=d22b5730c77662ce804d8aba01000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f24816c9d80>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.793] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.793] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.793] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.793] [info] [bucket_ub_psi.cc:186] Start Sync
2024-05-23 16:24:28.818 WARNING cleanup.py:154 [bob] -- [Anonymous_job] Failed to send ObjectRef(85748392bcd969ccdc36c0b6c411c57d2e34cc490100000001000000) to alice, error: ray::SenderProxyActor.send() (pid=1964, ip=192.168.15.7, actor_id=dc36c0b6c411c57d2e34cc4901000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fde80cd9f60>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=2014, ip=192.168.15.7, actor_id=ea3f375776b2bf62a5995c0c01000000, repr=SPURuntime(device_id=None, party=bob))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
RuntimeError: what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f973f4d2657
#1 yacl::link::AllGatherImpl<>()+0x7f973f4cd1b1
#2 yacl::link::AllGather()+0x7f973f4cd643
#3 psi::AllGatherItemsSize()+0x7f973f4cb295
#4 psi::UbPsiClientTransferCache()+0x7f973e1fc1ac
#5 psi::UbPsi()+0x7f973e2007ed
#6 psi::BucketPsi::RunPsi()+0x7f973e1f4528
#7 psi::BucketPsi::Run()+0x7f973e1f6960
#8 psi::RunLegacyPsi()+0x7f973e0aac04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f973e0a0640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f973e0a0baa
#11 pybind11::cpp_function::dispatcher()+0x7f973e082fed
#12 cfunction_call+0x4fd907,upstream_seq_id: 9#0, downstream_seq_id: 10.
2024-05-23 16:24:28.818 INFO cleanup.py:161 [bob] -- [Anonymous_job] Sending error what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f973f4d2657
#1 yacl::link::AllGatherImpl<>()+0x7f973f4cd1b1
#2 yacl::link::AllGather()+0x7f973f4cd643
#3 psi::AllGatherItemsSize()+0x7f973f4cb295
#4 psi::UbPsiClientTransferCache()+0x7f973e1fc1ac
#5 psi::UbPsi()+0x7f973e2007ed
#6 psi::BucketPsi::RunPsi()+0x7f973e1f4528
#7 psi::BucketPsi::Run()+0x7f973e1f6960
#8 psi::RunLegacyPsi()+0x7f973e0aac04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f973e0a0640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f973e0a0baa
#11 pybind11::cpp_function::dispatcher()+0x7f973e082fed
#12 cfunction_call+0x4fd907

 to alice.
2024-05-23 16:24:28.819 WARNING cleanup.py:127 [bob] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 16:24:28.819 WARNING api.py:60 [bob] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 16:24:28.819 WARNING api.py:325 [bob] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 16:24:28.819 ERROR api.py:330 [bob] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=1964, ip=192.168.15.7, actor_id=dc36c0b6c411c57d2e34cc4901000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fde80cd9f60>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=2014, ip=192.168.15.7, actor_id=ea3f375776b2bf62a5995c0c01000000, repr=SPURuntime(device_id=None, party=bob))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
RuntimeError: what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f973f4d2657
#1 yacl::link::AllGatherImpl<>()+0x7f973f4cd1b1
#2 yacl::link::AllGather()+0x7f973f4cd643
#3 psi::AllGatherItemsSize()+0x7f973f4cb295
#4 psi::UbPsiClientTransferCache()+0x7f973e1fc1ac
#5 psi::UbPsi()+0x7f973e2007ed
#6 psi::BucketPsi::RunPsi()+0x7f973e1f4528
#7 psi::BucketPsi::Run()+0x7f973e1f6960
#8 psi::RunLegacyPsi()+0x7f973e0aac04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f973e0a0640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f973e0a0baa
#11 pybind11::cpp_function::dispatcher()+0x7f973e082fed
#12 cfunction_call+0x4fd907
2024-05-23 16:24:28.819 INFO api.py:337 [bob] -- [Anonymous_job] No wait for data sending.
2024-05-23 16:24:28.820 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 16:24:28.820 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 16:24:28.820 INFO api.py:352 [bob] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 16:24:28.820 CRITICAL api.py:356 [bob] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

这次我截取了所有的log信息,不知道能不能给您帮助。

@6fj
Copy link
Member

6fj commented May 24, 2024

hi @winnylyc

可以尝试一下小数据量的情况吗,比如1k的量级,看看是否仍然有 std::bad_alloc 的问题呢

@winnylyc
Copy link
Author

我这边尝试了Sender方1000的数据,Receiver方100的数据,依然有std::bad_alloc的问题。
下面是详细的代码及报错
Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
server_data_mount = '1e3'
query_data_mount = '1e2'
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete offline phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-27 10:58:01,308 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-27 10:58:02.112 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-27 10:58:02.789 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=6577) 2024-05-27 10:58:02.783 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=6577) 2024-05-27 10:58:02.786 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-27 10:58:03.935 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-27 10:58:03.935 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-27 10:58:06.940 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Traceback (most recent call last):
  File "/root/project/psi1/sf_connect_ub.py", line 62, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.psi_csv() (pid=7302, ip=192.168.15.7, actor_id=684fee1a8bf31d7850793d0301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-27 10:58:10.182 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(85748392bcd969ccea7d0273a5056e9b0a95e4a50100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=6690, ip=192.168.15.7, actor_id=ea7d0273a5056e9b0a95e4a501000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f5d28319ff0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=7302, ip=192.168.15.7, actor_id=684fee1a8bf31d7850793d0301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc,upstream_seq_id: 8#0, downstream_seq_id: 10.
2024-05-27 10:58:10.182 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error std::bad_alloc to bob.
2024-05-27 10:58:10.183 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-27 10:58:10.183 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-27 10:58:10.183 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-27 10:58:10.183 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=6690, ip=192.168.15.7, actor_id=ea7d0273a5056e9b0a95e4a501000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f5d28319ff0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=7302, ip=192.168.15.7, actor_id=684fee1a8bf31d7850793d0301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-27 10:58:10.183 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-27 10:58:10.186 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-27 10:58:10.186 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-27 10:58:10.186 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-27 10:58:10.186 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.090] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi1/alice_exactpsi_1e3_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi1/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576}
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.090] [info] [bucket_psi.cc:400] bucket size set to 1048576
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.091] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.093] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi1/alice_exactpsi_1e3_unique.csv
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.093] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi1/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.093] [info] [ecdh_oprf_selector.cc:76] use fourq
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
server_data_mount = '1e3'
query_data_mount = '1e2'
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete offline phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-27 10:58:02,931 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-27 10:58:03.746 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-27 10:58:04.427 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=7121) 2024-05-27 10:58:04.423 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=7121) 2024-05-27 10:58:04.425 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-27 10:58:05.064 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-27 10:58:05.064 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.072] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_GEN_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"ecdh_secret_key_path":"/root/project/psi1/alice_oprf_key"}
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.072] [info] [bucket_psi.cc:425] Run psi protocol=7, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [ecdh_oprf_selector.cc:33] use fourq
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [batch_provider.cc:328] ReadAndShuffle start, idx:0, provider_batch_size:1048576
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [batch_provider.cc:350] ReadAndShuffle end, idx:0 , size:100
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [ecdh_oprf_psi.cc:108] omp_get_num_threads:1 cpus:16
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.077] [info] [ecdh_oprf_psi.cc:119] tid:0 omp_get_num_threads:16
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.077] [info] [batch_provider.cc:318] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.077] [info] [batch_provider.cc:240] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.080] [info] [ecdh_oprf_psi.cc:192] FullEvaluate finished, batch_count=1 items_count=100
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.085] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"preprocess_path":"preprocess_1e3"}
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.085] [info] [bucket_psi.cc:400] bucket size set to 1048576
2024-05-27 10:58:10.188 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.091] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.092] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.092] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.092] [info] [bucket_ub_psi.cc:186] Start Sync
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_ub.py", line 62, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=7121, ip=192.168.15.7, actor_id=d0e052dffaa2950908c0562001000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7fdfa94edde0>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
2024-05-27 11:00:10.104 WARNING cleanup.py:154 [bob] -- [Anonymous_job] Failed to send ObjectRef(85748392bcd969cc02815ae281fdd273dc240ded0100000001000000) to alice, error: ray::SenderProxyActor.send() (pid=7200, ip=192.168.15.7, actor_id=02815ae281fdd273dc240ded01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f52a419df90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=7250, ip=192.168.15.7, actor_id=37a37c5efe770003561c337501000000, repr=SPURuntime(device_id=None, party=bob))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
RuntimeError: what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f4f967fc657
#1 yacl::link::AllGatherImpl<>()+0x7f4f967f71b1
#2 yacl::link::AllGather()+0x7f4f967f7643
#3 psi::AllGatherItemsSize()+0x7f4f967f5295
#4 psi::UbPsiClientTransferCache()+0x7f4f955261ac
#5 psi::UbPsi()+0x7f4f9552a7ed
#6 psi::BucketPsi::RunPsi()+0x7f4f9551e528
#7 psi::BucketPsi::Run()+0x7f4f95520960
#8 psi::RunLegacyPsi()+0x7f4f953d4c04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f4f953ca640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f4f953cabaa
#11 pybind11::cpp_function::dispatcher()+0x7f4f953acfed
#12 cfunction_call+0x4fd907,upstream_seq_id: 9#0, downstream_seq_id: 10.
2024-05-27 11:00:10.105 INFO cleanup.py:161 [bob] -- [Anonymous_job] Sending error what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f4f967fc657
#1 yacl::link::AllGatherImpl<>()+0x7f4f967f71b1
#2 yacl::link::AllGather()+0x7f4f967f7643
#3 psi::AllGatherItemsSize()+0x7f4f967f5295
#4 psi::UbPsiClientTransferCache()+0x7f4f955261ac
#5 psi::UbPsi()+0x7f4f9552a7ed
#6 psi::BucketPsi::RunPsi()+0x7f4f9551e528
#7 psi::BucketPsi::Run()+0x7f4f95520960
#8 psi::RunLegacyPsi()+0x7f4f953d4c04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f4f953ca640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f4f953cabaa
#11 pybind11::cpp_function::dispatcher()+0x7f4f953acfed
#12 cfunction_call+0x4fd907

 to alice.
2024-05-27 11:00:10.105 WARNING cleanup.py:127 [bob] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-27 11:00:10.106 WARNING api.py:60 [bob] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-27 11:00:10.106 WARNING api.py:325 [bob] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-27 11:00:10.106 ERROR api.py:330 [bob] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=7200, ip=192.168.15.7, actor_id=02815ae281fdd273dc240ded01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f52a419df90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=7250, ip=192.168.15.7, actor_id=37a37c5efe770003561c337501000000, repr=SPURuntime(device_id=None, party=bob))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
RuntimeError: what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f4f967fc657
#1 yacl::link::AllGatherImpl<>()+0x7f4f967f71b1
#2 yacl::link::AllGather()+0x7f4f967f7643
#3 psi::AllGatherItemsSize()+0x7f4f967f5295
#4 psi::UbPsiClientTransferCache()+0x7f4f955261ac
#5 psi::UbPsi()+0x7f4f9552a7ed
#6 psi::BucketPsi::RunPsi()+0x7f4f9551e528
#7 psi::BucketPsi::Run()+0x7f4f95520960
#8 psi::RunLegacyPsi()+0x7f4f953d4c04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f4f953ca640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f4f953cabaa
#11 pybind11::cpp_function::dispatcher()+0x7f4f953acfed
#12 cfunction_call+0x4fd907
2024-05-27 11:00:10.106 INFO api.py:337 [bob] -- [Anonymous_job] No wait for data sending.
2024-05-27 11:00:10.107 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-27 11:00:10.107 INFO api.py:352 [bob] -- [Anonymous_job] Shutdowned rayfed.
2024-05-27 11:00:10.107 CRITICAL api.py:356 [bob] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

@winnylyc
Copy link
Author

winnylyc commented May 27, 2024

我这边对ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的理解似乎有问题。我理解ECDH_OPRF_UB_PSI_2PC_GEN_CACHE应该和ECDH_OPRF_UB_PSI_2PC_OFFLINE一样是准备offline阶段,只是将在硬盘上的读写操作改到cashe上。
如果是按照这个理解ECDH_OPRF_UB_PSI_2PC_GEN_CACHE应该和ECDH_OPRF_UB_PSI_2PC_OFFLINE操作的数据一样,但是我用secretnote单个调用ECDH_OPRF_UB_PSI_2PC_GEN_CACHE和ECDH_OPRF_UB_PSI_2PC_OFFLINE的输出不一样。
ECDH_OPRF_UB_PSI_2PC_GEN_CACHE:
image
ECDH_OPRF_UB_PSI_2PC_OFFLINE:
image
这里的场景是,alice作为sender有1000个数据,bob作为receiver有100个数据。ECDH_OPRF_UB_PSI_2PC_OFFLINE这里的输出是对alice的1000个数据做操作,这是符合预期的。但是ECDH_OPRF_UB_PSI_2PC_GEN_CACHE是对bob的100个数据做操作,我就没有搞清楚这里ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的作用是什么😓。

这里想请教您两个问题:

  1. ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的作用是什么?
  2. 怎样减少运行ECDH_OPRF_UB_PSI_2PC_OFFLINE和ECDH_OPRF_UB_PSI_2PC_ONLINE时产生的对preprocess file的硬盘上的读写操作。

@6fj
Copy link
Member

6fj commented May 27, 2024

hi @winnylyc

ECDH OPRF UB PSI 大致分为两个阶段:离线阶段和在线阶段,离线阶段可以分为大数据方产生cache和大数据方将cache发送给小数据方。

因此你可以用多种方式调用:

  1. ECDH_OPRF_UB_PSI_2PC_GEN_CACHE + ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE + ECDH_OPRF_UB_PSI_2PC_ONLINE
  2. ECDH_OPRF_UB_PSI_2PC_OFFLINE + ECDH_OPRF_UB_PSI_2PC_ONLINE

如果将 ECDH_OPRF_UB_PSI_2PC_ONLINE 替换为 ECDH_OPRF_UB_PSI_2PC_SHUFFLE_ONLINE,大数据方将获取结果而不是小数据方。

@winnylyc
Copy link
Author

感谢您的回答!
我理解了ECDH OPRF UB PSI的中四个协议的作用。

那似乎ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的输出确实有问题?
既然它和ECDH_OPRF_UB_PSI_2PC_OFFLINE一样都是对大数据方(sender)的数据进行操作,那为什么输出会不同呢?从我上面的测试来看,ECDH_OPRF_UB_PSI_2PC_GEN_CACHE看上去是对小数据方(receiver)的数据进行操作。

@6fj
Copy link
Member

6fj commented May 27, 2024

receiver 你需要设成大数据方。你现在设成小数据方了。

@winnylyc
Copy link
Author

winnylyc commented May 27, 2024

可是https://www.secretflow.org.cn/zh-CN/docs/secretflow/v1.6.1b0/source/secretflow.device.device.device#secretflow.device.device.spu.SPURuntime.psi_csv 中receiver参数中的描述是receiver is client(small dataset party)。
请问是ECDH_OPRF_UB_PSI_2PC_OFFLINE和ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的引用场景不同吗?即ECDH_OPRF_UB_PSI_2PC_OFFLINE适用于receiver是小数据方,ECDH_OPRF_UB_PSI_2PC_GEN_CACHE适用于receiver是大数据方?

@6fj
Copy link
Member

6fj commented May 28, 2024

这里的api确实比较混乱:

ECDH_OPRF_UB_PSI_2PC_GEN_CACHE:receiver填大数据方
ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE:receiver 填小数据方
ECDH_OPRF_UB_PSI_2PC_ONLINE:receiver 填小数据方

@winnylyc
Copy link
Author

您好,
将ECDH_OPRF_UB_PSI_2PC_GEN_CACHE部分的receiver改为大数据方依然会在ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE这一步出问题。
我这里就提供Sender方和Receiver方的代码和server方在执行ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE的报错了(似乎client方的报错没什么信息)
Sender(大数据方,server方)代码:

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
server_data_mount = '1e3'
query_data_mount = '1e2'
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='alice', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete gen cache phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete transfer cache phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_ONLINE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

Receiver(小数据方,client方)代码:

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
server_data_mount = '1e3'
query_data_mount = '1e2'
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='alice', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete gen cache phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete transfer cache phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_ONLINE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

Sender(大数据方,server方)报错:

Traceback (most recent call last):
  File "/root/project/psi1/sf_connect_ub.py", line 62, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.psi_csv() (pid=56359, ip=192.168.15.7, actor_id=452e9282d3675266f0bdc2a301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-28 11:46:04.429 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(c54e76759b2a0c1009459f9c5de8d181d6ef602f0100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=55543, ip=192.168.15.7, actor_id=09459f9c5de8d181d6ef602f01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fbf1e161f60>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=56359, ip=192.168.15.7, actor_id=452e9282d3675266f0bdc2a301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc,upstream_seq_id: 8#0, downstream_seq_id: 10.
2024-05-28 11:46:04.430 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error std::bad_alloc to bob.
2024-05-28 11:46:04.430 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-28 11:46:04.430 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-28 11:46:04.431 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-28 11:46:04.431 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=55543, ip=192.168.15.7, actor_id=09459f9c5de8d181d6ef602f01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fbf1e161f60>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=56359, ip=192.168.15.7, actor_id=452e9282d3675266f0bdc2a301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-28 11:46:04.431 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-28 11:46:04.432 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-28 11:46:04.432 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-28 11:46:04.432 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-28 11:46:04.432 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

@6fj
Copy link
Member

6fj commented Jun 3, 2024

hi @winnylyc , 我无法复现你报告的错误,以下为我的代码:

server:

import spu

import secretflow as sf

cluster_config = {
    "parties": {
        "alice": {"address": "127.0.0.1:59179", "listen_addr": "0.0.0.0:59179"},
        "bob": {"address": "127.0.0.1:53341", "listen_addr": "0.0.0.0:53341"},
    },
    "self_party": "alice",
}
sf.shutdown
sf.init(address="local", cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {"party": "alice", "address": "127.0.0.1:45413"},
        {"party": "bob", "address": "127.0.0.1:47480"},
    ],
    "runtime_config": {"protocol": spu.spu_pb2.SEMI2K, "field": spu.spu_pb2.FM128},
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    },
)

alice, bob = sf.PYU("alice"), sf.PYU("bob")

spu.psi_csv(
    key=["id_0", "id_1"],
    input_path="/tmp/server_input.csv",
    output_path="/tmp/server_cache",
    receiver="alice",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_GEN_CACHE",
    ecdh_secret_key_path="/tmp/alice_oprf_key.bin",
    curve_type="CURVE_FOURQ",
)

spu.psi_csv(
    key=[],
    input_path="/tmp/server_cache",
    output_path="",
    receiver="bob",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE",
    preprocess_path="/tmp/client_cache",
    ecdh_secret_key_path="",
    curve_type="CURVE_FOURQ",
)

spu.psi_csv(
    key={alice: [], bob: ["id_0", "id_1"]},
    input_path={
        alice: "",
        bob: "/tmp/client_input.csv",
    },
    output_path={
        alice: "",
        bob: "/tmp/client_output.csv",
    },
    receiver="bob",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_ONLINE",
    preprocess_path={
        alice: "/tmp/server_cache",
        bob: "/tmp/client_cache",
    },
    ecdh_secret_key_path="/tmp/alice_oprf_key.bin",
    curve_type="CURVE_FOURQ",
)

client:

import spu

import secretflow as sf

cluster_config = {
    "parties": {
        "alice": {"address": "127.0.0.1:59179", "listen_addr": "0.0.0.0:59179"},
        "bob": {"address": "127.0.0.1:53341", "listen_addr": "0.0.0.0:53341"},
    },
    "self_party": "bob",
}
sf.shutdown
sf.init(address="local", cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {"party": "alice", "address": "127.0.0.1:45413"},
        {"party": "bob", "address": "127.0.0.1:47480"},
    ],
    "runtime_config": {"protocol": spu.spu_pb2.SEMI2K, "field": spu.spu_pb2.FM128},
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    },
)

alice, bob = sf.PYU("alice"), sf.PYU("bob")

spu.psi_csv(
    key=["id_0", "id_1"],
    input_path="/tmp/server_input.csv",
    output_path="/tmp/server_cache",
    receiver="alice",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_GEN_CACHE",
    ecdh_secret_key_path="/tmp/alice_oprf_key.bin",
    curve_type="CURVE_FOURQ",
)

spu.psi_csv(
    key=[],
    input_path="/tmp/server_cache",
    output_path="",
    receiver="bob",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE",
    preprocess_path="/tmp/client_cache",
    ecdh_secret_key_path="",
    curve_type="CURVE_FOURQ",
)

spu.psi_csv(
    key={alice: [], bob: ["id_0", "id_1"]},
    input_path={
        alice: "",
        bob: "/tmp/client_input.csv",
    },
    output_path={
        alice: "",
        bob: "/tmp/client_output.csv",
    },
    receiver="bob",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_ONLINE",
    preprocess_path={
        alice: "/tmp/server_cache",
        bob: "/tmp/client_cache",
    },
    ecdh_secret_key_path="/tmp/alice_oprf_key.bin",
    curve_type="CURVE_FOURQ",
)

请注意,ub psi 的 API 目前比较混乱,我们推荐使用新的 API v2,但是目前 API v2 ub psi 不支持将结果发给大数据方

@winnylyc
Copy link
Author

winnylyc commented Jun 3, 2024

非常感谢您的帮助,已将整个流程跑通!

@winnylyc winnylyc closed this as completed Jun 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants