Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZMQ_STREAM support in cmzq #2192

Open
sphaero opened this issue Jul 14, 2021 · 3 comments
Open

ZMQ_STREAM support in cmzq #2192

sphaero opened this issue Jul 14, 2021 · 3 comments
Labels

Comments

@sphaero
Copy link
Contributor

sphaero commented Jul 14, 2021

I'm not sure if czmq supports the stream socket fully. I'm trying to add a test for it in zsock.c:

#ifdef ZMQ_STREAM
    zsock_t *streamrecv = zsock_new_stream("@tcp://127.0.0.1:1234");
    assert (streamrecv);

    zsock_t *streamsender = zsock_new_stream(">tcp://127.0.0.1:1234");
    assert (streamsender);

    zmsg_t *connectmsg = zmsg_recv(streamrecv); // we receive a message on connect
    zframe_t *id = zmsg_pop(connectmsg);        // first frame is id
    assert (id);
    assert (zframe_size(id) == 5);              // the id is 5 bytes
    zframe_t *empty = zmsg_pop(connectmsg);     // second frame is empty
    assert (empty);
    assert (zframe_size(empty) == 0);

    // send a http request, first get our routing id
    uint32_t rid = zsock_routing_id (streamsender);
    zmsg_t *request = zmsg_new();
    assert (request);
    rc = zmsg_addmem(request, &rid, sizeof(uint32_t));
    assert (rc == 0);
    rc = zmsg_addstr(request, "GET /\n\n");
    assert (rc == 0);
    rc = zmsg_send(&request, streamsender);
    assert (rc == 0);

    // receive the request
    zmsg_t * recvreq = zmsg_recv(streamrecv);
    assert (recvreq);
    zframe_t* ridframe = zmsg_pop(recvreq); // first frame is routing id
    uint32_t *rid2 = (uint32_t *)zframe_data(ridframe);
    assert (*rid2 == rid);
    char *httpreq = zmsg_popstr(recvreq);
    assert (streq(httpreq, "GET /\n\n"));
#endif

This asserts on the send" rc = zmsg_send(&request, streamsender);

It's doing the connect but requesting the routing id is apparently only supported for the SERVER socket?

Anybody thoughts, hints, suggestions?

P.s. cleanup is missing in the code

@sphaero
Copy link
Contributor Author

sphaero commented Jul 16, 2021

It's a bit fuzzy but the routing_id should be used. However routing_id methods only deal with SERVER sockets in czmq. So this has te be done manually for now. We can get the sockets routing_id as follows:

byte *rid = (byte *)zsock_identity(streamsender);

This is a bit fuzzy. The ZMQ_IDENTITY option is deprecated in favour of ZMQ_ROUTING_ID. However the routing_id methods in czmq are quite different from zsock_identity (which are generated?). zsock_identity returns directly from zmq_getsockopt. zsock_routing_id returns a cached uint32_t which is only set for SERVER sockets on brecv.

So to workaround this manually we need to call zmq methods directly to retrieve the sockets id:

uint8_t ridr [256];
size_t rid_size = 256;
rc = zmq_getsockopt (zsock_resolve(streamsender), ZMQ_IDENTITY, ridr, &rid_size);
assert (rc == 0);

This is almost identical to calling zsock_identy. However it returns a char* so you cannot know the the size.

The routing_is methods in czmq are draft. Therefore it could be an option to change them so they are more in sync with libzmq's documentation. But reading the code it seems czmq is trying to make the routing _id transparent so users are not bothered with it.

@sphaero
Copy link
Contributor Author

sphaero commented Jul 16, 2021

For now created #2193

@stale
Copy link

stale bot commented Apr 16, 2022

This issue has been automatically marked as stale because it has not had recent activity for 90 days. It will be closed if no further activity occurs within 21 days. Thank you for your contributions.

@stale stale bot added the stale label Apr 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant