[Libwebsockets] libwebsockets integration in zeroMQ zloop or libev, anyone?

Edwin van den Oetelaar oetelaar.automatisering at gmail.com
Sat May 11 14:23:26 CEST 2013


I have mixed czmq and libwebsockets, not very hard.
I will cut and paste some relevant code here, not my application that
is internal code for now.


/*
     * create the websocket context now
     */

    if (NULL == (ctx = libwebsocket_create_context(&info))) {
        lwsl_err("ERR libwebsocket_create_context()");
        return -1;
    } else lwsl_notice("OK libwebsocket_create_context()");

    /*
     * create the ZMQ context
     */

    if (NULL == (zmq_context = zctx_new())) {
        lwsl_err("ERR zctx_new()");
        return -1;
    } else lwsl_notice("OK zctx_new()");

    /*
     * create ZMQ socket PUB socket
     */

    if (NULL == (zmq_publisher = zsocket_new(zmq_context, ZMQ_PUB))) {
        lwsl_err("ERR zsocket_new(ZMQ_PUB) %s", strerror(errno));
        return -1;
    } else lwsl_notice("OK zsocket_new(ZMQ_PUB)");

    /*
     * bind PUB sock to network
     */

    char uri[255] = {0};
    int rc = 0;

    sprintf(uri, "tcp://127.0.0.1:%d", ZMQ_NUTS_PORT_BASE); // 6626

    /* NETWORK BIND */

    if (-1 == (rc = zsocket_bind(zmq_publisher, uri))) {
        lwsl_err("ERR zsocket_bind(zmq_publisher,%s) %s %d",
                 uri, strerror(errno), rc);
        return -1;
    } else
        lwsl_notice("OK zsocket_bind(zmq_publisher,%s) port %d",
                    uri, rc);

    /* Connect the SUB socket */

    if (NULL == (zmq_subscriber = zsocket_new(zmq_context, ZMQ_SUB))) {
        lwsl_err("ERR zsocket_new(ZMQ_SUB) %s",
                 strerror(errno));
        return -1;
    } else lwsl_notice("OK zsocket_new(ZMQ_SUB)");

    // TODO find this with discovery service

    sprintf(uri, "tcp://127.0.0.1:%d", ZMQ_NUTS_PORT_BASE + 1);

    /* Subscribe to CHANNEL Filter */

    if (0 != (rc = zsocket_connect(zmq_subscriber, uri))) {
        lwsl_err("ERR zsocket_connect() %s %d", strerror(errno), rc);
        return -1;
    } else lwsl_notice("OK zsocket_connect(%p, %s)", zmq_subscriber, uri);

    zsocket_set_subscribe(zmq_subscriber, zmq_channel_name);

    lwsl_notice("OK zmq_setsockopt(ZMQ_SUBSCRIBE)");

    /*
     * add the ZMQ SUB socket to a hashtable for lookup
     */

    ht_sockets_t *s;
    s = malloc(sizeof (ht_sockets_t));
    s->socket = zmq_subscriber;

    HASH_ADD_PTR(zmq_sockets, /* ptr to head */
                 socket, /* the key field */
                 s /* ref to struct */); /* id: name of key field */

    /*
     * Add the 0MQ socket to the global list of watched sockets
     */

    pollfds[count_pollfds].fd = -1;
    pollfds[count_pollfds].socket = zmq_subscriber;
    pollfds[count_pollfds].events = ZMQ_POLLIN;
    pollfds[count_pollfds++].revents = 0;


// ---------------------------

zmq_pollitem_t *pollfds;

pollfds = malloc(max_poll_elements * sizeof (zmq_pollitem_t));

case LWS_CALLBACK_ADD_POLL_FD:
        fd_lookup[fd] = count_pollfds;
        pollfds[count_pollfds].socket = NULL; // THE ZMQ socket
        pollfds[count_pollfds].fd = fd;
        pollfds[count_pollfds].events = (int) (long) len;
        pollfds[count_pollfds++].revents = 0;
        break;

// -----------------------------------------

 while (keep_running) {
        /*
         * this represents an single poll() action
         * which also includes libwebsocket sockets
         */
        errno = 0;
        int n = zmq_poll(pollfds, count_pollfds, -1);
        if (n == 0) {
            /*
             * do idle tasks, timeout
             */
        } else if (n > 0) {
            /*
             * some sockets ready for action
             */
            int i;
            for (i = 0; i < count_pollfds; i++) {
                /*
                 * find the sockets that have some work to do
                 */
                if (pollfds[i].revents) {
                    /*
                     * check if it is a 0MQ socket
                     */
                    if (pollfds[i].socket != NULL) {
                        /*
                         * socket!=0, this means my own servers
                         * - could be 0MQ
                         * - could be telnet
                         * - could be other
                         */
                        //lwsl_err("service other fd fd=%d %d",
pollfds[i].fd, HASH_COUNT(zmq_sockets));
                        // ht_sockets_t *iter;
                        // for (iter = zmq_sockets; iter != NULL; iter
= iter->hh.next)
                        //     lwsl_err("val = %p %p %p", iter,
iter->socket, pollfds[i].socket);
                        /* is this socket a ZMQ socket */
                        ht_sockets_t *fs;
                        // nh->socket = pollfds[i].socket;
                        HASH_FIND_PTR(
                                      zmq_sockets, /* the hash table */
                                      &pollfds[i].socket, /* note & do
not forget it */
                                      fs /* result pointer */
                                      );
                        // lwsl_err("find val = %p %p", fs, pollfds[i].socket);

                        /* check what we found in the lookup */
                        if (fs) {
                            /*
                             * do some 0MQ magic stuff
                             */
                            int error = handle_zmq_msg(fs->socket);
                            if (error)
                                lwsl_err("handle_zmq_msg() fail");
                            else
                                lwsl_notice("handle_zmq_msg() ok");
                            //
                            //                        } else if
(is_socket(pollfds[i].socket,
                            //
    my_telnet_sockets,
                            //
    ARRAYSIZE(my_telnet_sockets))) {
                            //                            /*
                            //                             * do some
Telnet magic stuff
                            //                             */
                            //                        } else if
(is_socket(pollfds[i].socket,
                            //
    my_other_sockets,
                            //
    ARRAYSIZE(my_other_sockets))) {
                            //                            /*
                            //                             * do some
Other magic stuff
                            //                             */
                        }

                    } else {
                        /*
                         * socket==0, so LWS magic stuff
                         */
                        struct pollfd pollfd_copy;
                        /*
                         * copy stuff from zmq_socket into normal pollfd
                         * since the LWS can not handle extra fields
                         */
                        pollfd_copy.revents = pollfds[i].revents;
                        pollfd_copy.events = pollfds[i].events;
                        pollfd_copy.fd = pollfds[i].fd;

                        /*
                         * LWS service the fd, will not block now
                         */

                        // lwsl_err("service fd=%d revents=%d
events=%d", pollfd_copy.fd, pollfd_copy.revents, pollfd_copy.events);

                        int rv = libwebsocket_service_fd(ctx, &pollfd_copy);
                        /*
                         * check internal problems of LWS
                         */
                        if (rv != 0) {
                            lwsl_err("libwebsocket_service_fd() Fail (%d)", rv);
                            goto cleanup_exit;
                        }
                    }
                }
            }
        } else {
            /*
             * n<0
             * problem detected
             */
            if (errno == EINTR) {
                lwsl_warn("zmq_poll() <0 and got EINTR (ctrl-C maybe)
exit now");
            }
            goto cleanup_exit; /* error, not EINTR */
        }
    }



On Sat, May 11, 2013 at 2:05 PM, Michael Haberler <mail17 at mah.priv.at> wrote:
> Hi,
>
> I'm investigating integration of libwebsockets into a zeroMQ (actually czmq) event loop, as well as libev (different use case)
>
> the issue I have is:
>
> - libwebsockets assumes there is a poll(2) call in the external event loop and relies on the pollfds data structure
> - this is not a given with czmq (or libev for that matter) which might use one of a series of different event notification mechanism: poll,epoll, select, kqueue etc
> - czmq deals with the issue by abstracting the actual system call arguments by providing its own event notification data structure, zmq_pollitem_t (see http://api.zeromq.org/3-2:zmq-poll)
> - libev does this in a very similar way with struct ev_watcher
>
> I'm unsure how to do this without butchering up the library code proper, which I'd prefer to avoid
>
> I'd be happy with a stopgap measure for now
>
> wouldnt it be better long term to remove the assumption of a particular underlying system call altogether?
>
> best regards
>
> Michael
>
>
>
> _______________________________________________
> Libwebsockets mailing list
> Libwebsockets at ml.libwebsockets.org
> http://ml.libwebsockets.org/mailman/listinfo/libwebsockets



More information about the Libwebsockets mailing list