{"schema":"libjg2-1",
"vpath":"/git/",
"avatar":"/git/avatar/",
"alang":"",
"gen_ut":1711625320,
"reponame":"libwebsockets",
"desc":"libwebsockets lightweight C networking library",
"owner": { "name": "Andy Green", "email": "andy@warmcat.com", "md5": "c50933ca2aa61e0fe2c43d46bb6b59cb" },"url":"https://libwebsockets.org/repo/libwebsockets",
"f":3,
"items": [
{"schema":"libjg2-1",
"cid":"e9188f96bde74028adc265d2cf832d53",
"oid":{ "oid": "e4be3317ee421f951fb16a85c7edacc9b8a0e6aa", "alias": [ "refs/heads/main"]},"blobname": "plugins/protocol_lws_mirror.c", "blob": "/*\n * libwebsockets-test-server - libwebsockets test implementation\n *\n * Written in 2010-2019 by Andy Green \u003candy@warmcat.com\u003e\n *\n * This file is made available under the Creative Commons CC0 1.0\n * Universal Public Domain Dedication.\n *\n * The person who associated a work with this deed has dedicated\n * the work to the public domain by waiving all of his or her rights\n * to the work worldwide under copyright law, including all related\n * and neighboring rights, to the extent allowed by law. You can copy,\n * modify, distribute and perform the work, even for commercial purposes,\n * all without asking permission.\n *\n * The test apps are intended to be adapted for use in your code, which\n * may be proprietary. So unlike the library itself, they are licensed\n * Public Domain.\n *\n * Notice that the lws_pthread... locking apis are all zero-footprint\n * NOPs in the case LWS_MAX_SMP \u003d\u003d 1, which is the default. When lws\n * is built for multiple service threads though, they resolve to their\n * pthreads equivalents.\n */\n\n#if !defined (LWS_PLUGIN_STATIC)\n#if !defined(LWS_DLL)\n#define LWS_DLL\n#endif\n#if !defined(LWS_INTERNAL)\n#define LWS_INTERNAL\n#endif\n#include \u003clibwebsockets.h\u003e\n#endif\n\n#include \u003cstring.h\u003e\n#include \u003cstdlib.h\u003e\n\n#define QUEUELEN 32\n/* queue free space below this, rx flow is disabled */\n#define RXFLOW_MIN (4)\n/* queue free space above this, rx flow is enabled */\n#define RXFLOW_MAX ((2 * QUEUELEN) / 3)\n\n#define MAX_MIRROR_INSTANCES 3\n\nstruct mirror_instance;\n\nstruct per_session_data__lws_mirror {\n\tstruct lws *wsi;\n\tstruct mirror_instance *mi;\n\tstruct per_session_data__lws_mirror *same_mi_pss_list;\n\tuint32_t tail;\n};\n\n/* this is the element in the ring */\nstruct a_message {\n\tvoid *payload;\n\tsize_t len;\n};\n\nstruct mirror_instance {\n\tstruct mirror_instance *next;\n\tlws_pthread_mutex(lock) /* protects all mirror instance data */\n\tstruct per_session_data__lws_mirror *same_mi_pss_list;\n\t/**\u003c must hold the the per_vhost_data__lws_mirror.lock as well\n\t * to change mi list membership */\n\tstruct lws_ring *ring;\n\tint messages_allocated;\n\tchar name[30];\n\tchar rx_enabled;\n};\n\nstruct per_vhost_data__lws_mirror {\n\tlws_pthread_mutex(lock) /* protects mi_list membership changes */\n\tstruct mirror_instance *mi_list;\n};\n\n\n/* enable or disable rx from all connections to this mirror instance */\nstatic void\n__mirror_rxflow_instance(struct mirror_instance *mi, int enable)\n{\n\tlws_start_foreach_ll(struct per_session_data__lws_mirror *,\n\t\t\t pss, mi-\u003esame_mi_pss_list) {\n\t\tlws_rx_flow_control(pss-\u003ewsi, enable);\n\t} lws_end_foreach_ll(pss, same_mi_pss_list);\n\n\tmi-\u003erx_enabled \u003d (char)enable;\n}\n\n/*\n * Find out which connection to this mirror instance has the longest number\n * of still unread elements in the ringbuffer and update the lws_ring \u0022oldest\n * tail\u0022 with it. Elements behind the \u0022oldest tail\u0022 are freed and recycled for\n * new head content. Elements after the \u0022oldest tail\u0022 are still waiting to be\n * read by somebody.\n *\n * If the oldest tail moved on from before, check if it created enough space\n * in the queue to re-enable RX flow control for the mirror instance.\n *\n * Mark connections that are at the oldest tail as being on a 3s timeout to\n * transmit something, otherwise the connection will be closed. Without this,\n * a choked or nonresponsive connection can block the FIFO from freeing up any\n * new space for new data.\n *\n * You can skip calling this if on your connection, before processing, the tail\n * was not equal to the current worst, ie, if the tail you will work on is !\u003d\n * lws_ring_get_oldest_tail(ring) then no need to call this when the tail\n * has changed; it wasn't the oldest so it won't change the oldest.\n *\n * Returns 0 if oldest unchanged or 1 if oldest changed from this call.\n */\nstatic int\n__mirror_update_worst_tail(struct mirror_instance *mi)\n{\n\tuint32_t wai, worst \u003d 0, worst_tail \u003d 0, oldest;\n\tstruct per_session_data__lws_mirror *worst_pss \u003d NULL;\n\n\toldest \u003d lws_ring_get_oldest_tail(mi-\u003ering);\n\n\tlws_start_foreach_ll(struct per_session_data__lws_mirror *,\n\t\t\t pss, mi-\u003esame_mi_pss_list) {\n\t\twai \u003d (uint32_t)lws_ring_get_count_waiting_elements(mi-\u003ering,\n\t\t\t\t\t\t\t\t\u0026pss-\u003etail);\n\t\tif (wai \u003e\u003d worst) {\n\t\t\tworst \u003d wai;\n\t\t\tworst_tail \u003d pss-\u003etail;\n\t\t\tworst_pss \u003d pss;\n\t\t}\n\t} lws_end_foreach_ll(pss, same_mi_pss_list);\n\n\tif (!worst_pss)\n\t\treturn 0;\n\n\tlws_ring_update_oldest_tail(mi-\u003ering, worst_tail);\n\tif (oldest \u003d\u003d lws_ring_get_oldest_tail(mi-\u003ering))\n\t\treturn 0;\n\t/*\n\t * The oldest tail did move on. Check if we should re-enable rx flow\n\t * for the mirror instance since we made some space now.\n\t */\n\tif (!mi-\u003erx_enabled \u0026\u0026 /* rx is disabled */\n\t lws_ring_get_count_free_elements(mi-\u003ering) \u003e\u003d RXFLOW_MAX)\n\t\t/* there is enough space, let's re-enable rx for our instance */\n\t\t__mirror_rxflow_instance(mi, 1);\n\n\t/* if nothing in queue, no timeout needed */\n\tif (!worst)\n\t\treturn 1;\n\n\t/*\n\t * The guy(s) with the oldest tail block the ringbuffer from recycling\n\t * the FIFO entries he has not read yet. Don't allow those guys to\n\t * block the FIFO operation for very long.\n\t */\n\tlws_start_foreach_ll(struct per_session_data__lws_mirror *,\n\t\t\t pss, mi-\u003esame_mi_pss_list) {\n\t\tif (pss-\u003etail \u003d\u003d worst_tail)\n\t\t\t/*\n\t\t\t * Our policy is if you are the slowest connection,\n\t\t\t * you had better transmit something to help with that\n\t\t\t * within 3s, or we will hang up on you to stop you\n\t\t\t * blocking the FIFO for everyone else.\n\t\t\t */\n\t\t\tlws_set_timeout(pss-\u003ewsi,\n\t\t\t\t\tPENDING_TIMEOUT_USER_REASON_BASE, 3);\n\t} lws_end_foreach_ll(pss, same_mi_pss_list);\n\n\treturn 1;\n}\n\nstatic void\n__mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi)\n{\n\t/* ask for WRITABLE callback for every wsi on this mi */\n\tlws_start_foreach_ll(struct per_session_data__lws_mirror *,\n\t\t\t pss, mi-\u003esame_mi_pss_list) {\n\t\tlws_callback_on_writable(pss-\u003ewsi);\n\t} lws_end_foreach_ll(pss, same_mi_pss_list);\n}\n\nstatic void\n__mirror_destroy_message(void *_msg)\n{\n\tstruct a_message *msg \u003d _msg;\n\n\tfree(msg-\u003epayload);\n\tmsg-\u003epayload \u003d NULL;\n\tmsg-\u003elen \u003d 0;\n}\n\nstatic int\ncallback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,\n\t\t void *user, void *in, size_t len)\n{\n\tstruct per_session_data__lws_mirror *pss \u003d\n\t\t\t(struct per_session_data__lws_mirror *)user;\n\tstruct per_vhost_data__lws_mirror *v \u003d\n\t\t\t(struct per_vhost_data__lws_mirror *)\n\t\t\tlws_protocol_vh_priv_get(lws_get_vhost(wsi),\n\t\t\t\t\t\t lws_get_protocol(wsi));\n\tchar name[300], update_worst, sent_something, *pn \u003d name;\n\tstruct mirror_instance *mi \u003d NULL;\n\tconst struct a_message *msg;\n\tstruct a_message amsg;\n\tuint32_t oldest_tail;\n\tint n, count_mi \u003d 0;\n\n\tswitch (reason) {\n\tcase LWS_CALLBACK_ESTABLISHED:\n\t\tlwsl_info(\u0022%s: LWS_CALLBACK_ESTABLISHED\u005cn\u0022, __func__);\n\t\tif (!v) {\n\t\t\tlws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),\n\t\t\t\t\tlws_get_protocol(wsi),\n\t\t\t\t\tsizeof(struct per_vhost_data__lws_mirror));\n\t\t\tv \u003d (struct per_vhost_data__lws_mirror *)\n\t\t\t\t\tlws_protocol_vh_priv_get(lws_get_vhost(wsi),\n\t\t\t\t\t\t\t\t lws_get_protocol(wsi));\n\t\t\tlws_pthread_mutex_init(\u0026v-\u003elock);\n\t\t}\n\n\t\t/*\n\t\t * mirror instance name... defaults to \u0022\u0022, but if URL includes\n\t\t * \u0022?mirror\u003dxxx\u0022, will be \u0022xxx\u0022\n\t\t */\n\n\t\tif (lws_get_urlarg_by_name_safe(wsi, \u0022mirror\u0022, name,\n\t\t\t\t\t sizeof(name) - 1) \u003c 0) {\n\t\t\tlwsl_debug(\u0022get urlarg failed\u005cn\u0022);\n\t\t\tname[0] \u003d '\u005c0';\n\t\t}\n\n\t\t//lwsl_notice(\u0022%s: mirror name '%s'\u005cn\u0022, __func__, pn);\n\n\t\t/* is there already a mirror instance of this name? */\n\n\t\tlws_pthread_mutex_lock(\u0026v-\u003elock); /* vhost lock { */\n\n\t\tlws_start_foreach_ll(struct mirror_instance *, mi1,\n\t\t\t\t v-\u003emi_list) {\n\t\t\tcount_mi++;\n\t\t\tif (!strcmp(pn, mi1-\u003ename)) {\n\t\t\t\t/* yes... we will join it */\n\t\t\t\tmi \u003d mi1;\n\t\t\t\tbreak;\n\t\t\t}\n\t\t} lws_end_foreach_ll(mi1, next);\n\n\t\tif (!mi) {\n\n\t\t\t/* no existing mirror instance for name */\n\t\t\tif (count_mi \u003d\u003d MAX_MIRROR_INSTANCES) {\n\t\t\t\tlws_pthread_mutex_unlock(\u0026v-\u003elock); /* } vh lock */\n\t\t\t\treturn -1;\n\t\t\t}\n\n\t\t\t/* create one with this name, and join it */\n\t\t\tmi \u003d malloc(sizeof(*mi));\n\t\t\tif (!mi)\n\t\t\t\tgoto bail1;\n\t\t\tmemset(mi, 0, sizeof(*mi));\n\t\t\tmi-\u003ering \u003d lws_ring_create(sizeof(struct a_message),\n\t\t\t\t\t\t QUEUELEN,\n\t\t\t\t\t\t __mirror_destroy_message);\n\t\t\tif (!mi-\u003ering) {\n\t\t\t\tfree(mi);\n\t\t\t\tgoto bail1;\n\t\t\t}\n\n\t\t\tmi-\u003enext \u003d v-\u003emi_list;\n\t\t\tv-\u003emi_list \u003d mi;\n\t\t\tlws_snprintf(mi-\u003ename, sizeof(mi-\u003ename) - 1, \u0022%s\u0022, pn);\n\t\t\tmi-\u003erx_enabled \u003d 1;\n\n\t\t\tlws_pthread_mutex_init(\u0026mi-\u003elock);\n\n\t\t\tlwsl_notice(\u0022Created new mi %p '%s'\u005cn\u0022, mi, pn);\n\t\t}\n\n\t\t/* add our pss to list of guys bound to this mi */\n\n\t\tlws_ll_fwd_insert(pss, same_mi_pss_list, mi-\u003esame_mi_pss_list);\n\n\t\t/* init the pss */\n\n\t\tpss-\u003emi \u003d mi;\n\t\tpss-\u003etail \u003d lws_ring_get_oldest_tail(mi-\u003ering);\n\t\tpss-\u003ewsi \u003d wsi;\n\n\t\tlws_pthread_mutex_unlock(\u0026v-\u003elock); /* } vhost lock */\n\t\tbreak;\n\nbail1:\n\t\tlws_pthread_mutex_unlock(\u0026v-\u003elock); /* } vhost lock */\n\t\treturn 1;\n\n\tcase LWS_CALLBACK_CLOSED:\n\t\t/* detach our pss from the mirror instance */\n\t\tmi \u003d pss-\u003emi;\n\t\tif (!mi)\n\t\t\tbreak;\n\n\t\tlws_pthread_mutex_lock(\u0026v-\u003elock); /* vhost lock { */\n\n\t\t/* remove our closing pss from its mirror instance list */\n\t\tlws_ll_fwd_remove(struct per_session_data__lws_mirror,\n\t\t\t\t same_mi_pss_list, pss, mi-\u003esame_mi_pss_list);\n\t\tpss-\u003emi \u003d NULL;\n\n\t\tif (mi-\u003esame_mi_pss_list) {\n\t\t\t/*\n\t\t\t * Still other pss using the mirror instance. The pss\n\t\t\t * going away may have had the oldest tail, reconfirm\n\t\t\t * using the remaining pss what is the current oldest\n\t\t\t * tail. If the oldest tail moves on, this call also\n\t\t\t * will re-enable rx flow control when appropriate.\n\t\t\t */\n\t\t\tlws_pthread_mutex_lock(\u0026mi-\u003elock); /* mi lock { */\n\t\t\t__mirror_update_worst_tail(mi);\n\t\t\tlws_pthread_mutex_unlock(\u0026mi-\u003elock); /* } mi lock */\n\t\t\tlws_pthread_mutex_unlock(\u0026v-\u003elock); /* } vhost lock */\n\t\t\tbreak;\n\t\t}\n\n\t\t/* No more pss using the mirror instance... delete mi */\n\n\t\tlws_start_foreach_llp(struct mirror_instance **,\n\t\t\t\tpmi, v-\u003emi_list) {\n\t\t\tif (*pmi \u003d\u003d mi) {\n\t\t\t\t*pmi \u003d (*pmi)-\u003enext;\n\n\t\t\t\tlws_ring_destroy(mi-\u003ering);\n\t\t\t\tlws_pthread_mutex_destroy(\u0026mi-\u003elock);\n\n\t\t\t\tfree(mi);\n\t\t\t\tbreak;\n\t\t\t}\n\t\t} lws_end_foreach_llp(pmi, next);\n\n\t\tlws_pthread_mutex_unlock(\u0026v-\u003elock); /* } vhost lock */\n\t\tbreak;\n\n\tcase LWS_CALLBACK_CONFIRM_EXTENSION_OKAY:\n\t\treturn 1; /* disallow compression */\n\n\tcase LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */\n\t\tif (!v) {\n\t\t\tlws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),\n\t\t\t\tlws_get_protocol(wsi),\n\t\t\t\tsizeof(struct per_vhost_data__lws_mirror));\n\t\t\tv \u003d (struct per_vhost_data__lws_mirror *)\n\t\t\t\tlws_protocol_vh_priv_get(lws_get_vhost(wsi),\n\t\t\t\t\t\t\t lws_get_protocol(wsi));\n\t\t\tif (!v)\n\t\t\t\treturn 0;\n\t\t\tlws_pthread_mutex_init(\u0026v-\u003elock);\n\t\t}\n\t\tbreak;\n\n\tcase LWS_CALLBACK_PROTOCOL_DESTROY:\n\t\tlws_pthread_mutex_destroy(\u0026v-\u003elock);\n\t\tbreak;\n\n\tcase LWS_CALLBACK_SERVER_WRITEABLE:\n\t\tlws_pthread_mutex_lock(\u0026pss-\u003emi-\u003elock); /* instance lock { */\n\t\toldest_tail \u003d lws_ring_get_oldest_tail(pss-\u003emi-\u003ering);\n\t\tupdate_worst \u003d oldest_tail \u003d\u003d pss-\u003etail;\n\t\tsent_something \u003d 0;\n\n\t\tdo {\n\t\t\tmsg \u003d lws_ring_get_element(pss-\u003emi-\u003ering, \u0026pss-\u003etail);\n\t\t\tif (!msg)\n\t\t\t\tbreak;\n\n\t\t\tif (!msg-\u003epayload) {\n\t\t\t\tlwsl_err(\u0022%s: NULL payload: worst \u003d %d,\u0022\n\t\t\t\t\t \u0022 pss-\u003etail \u003d %d\u005cn\u0022, __func__,\n\t\t\t\t\t oldest_tail, pss-\u003etail);\n\t\t\t\tif (lws_ring_consume(pss-\u003emi-\u003ering, \u0026pss-\u003etail,\n\t\t\t\t\t\t NULL, 1))\n\t\t\t\t\tcontinue;\n\t\t\t\tbreak;\n\t\t\t}\n\n\t\t\tn \u003d lws_write(wsi, (unsigned char *)msg-\u003epayload +\n\t\t\t\t LWS_PRE, msg-\u003elen, LWS_WRITE_TEXT);\n\t\t\tif (n \u003c 0) {\n\t\t\t\tlwsl_info(\u0022%s: WRITEABLE: %d\u005cn\u0022, __func__, n);\n\n\t\t\t\tgoto bail2;\n\t\t\t}\n\t\t\tsent_something \u003d 1;\n\t\t\tlws_ring_consume(pss-\u003emi-\u003ering, \u0026pss-\u003etail, NULL, 1);\n\n\t\t} while (!lws_send_pipe_choked(wsi));\n\n\t\t/* if any left for us to send, ask for writeable again */\n\t\tif (lws_ring_get_count_waiting_elements(pss-\u003emi-\u003ering,\n\t\t\t\t\t\t\t\u0026pss-\u003etail))\n\t\t\tlws_callback_on_writable(wsi);\n\n\t\tif (!sent_something || !update_worst)\n\t\t\tgoto done1;\n\n\t\t/*\n\t\t * We are no longer holding the oldest tail (since we sent\n\t\t * something. So free us of the timeout related to hogging the\n\t\t * oldest tail.\n\t\t */\n\t\tlws_set_timeout(pss-\u003ewsi, NO_PENDING_TIMEOUT, 0);\n\t\t/*\n\t\t * If we were originally at the oldest fifo position of\n\t\t * all the tails, now we used some up we may have\n\t\t * changed the oldest fifo position and made some space.\n\t\t */\n\t\t__mirror_update_worst_tail(pss-\u003emi);\n\ndone1:\n\t\tlws_pthread_mutex_unlock(\u0026pss-\u003emi-\u003elock); /* } instance lock */\n\t\tbreak;\n\nbail2:\n\t\tlws_pthread_mutex_unlock(\u0026pss-\u003emi-\u003elock); /* } instance lock */\n\n\t\treturn -1;\n\n\tcase LWS_CALLBACK_RECEIVE:\n\t\tlws_pthread_mutex_lock(\u0026pss-\u003emi-\u003elock); /* mi lock { */\n\t\tn \u003d (int)lws_ring_get_count_free_elements(pss-\u003emi-\u003ering);\n\t\tif (!n) {\n\t\t\tlwsl_notice(\u0022dropping!\u005cn\u0022);\n\t\t\tif (pss-\u003emi-\u003erx_enabled)\n\t\t\t\t__mirror_rxflow_instance(pss-\u003emi, 0);\n\t\t\tgoto req_writable;\n\t\t}\n\n\t\tamsg.payload \u003d malloc(LWS_PRE + len);\n\t\tamsg.len \u003d len;\n\t\tif (!amsg.payload) {\n\t\t\tlwsl_notice(\u0022OOM: dropping\u005cn\u0022);\n\t\t\tgoto done2;\n\t\t}\n\n\t\tmemcpy((char *)amsg.payload + LWS_PRE, in, len);\n\t\tif (!lws_ring_insert(pss-\u003emi-\u003ering, \u0026amsg, 1)) {\n\t\t\t__mirror_destroy_message(\u0026amsg);\n\t\t\tlwsl_notice(\u0022dropping!\u005cn\u0022);\n\t\t\tif (pss-\u003emi-\u003erx_enabled)\n\t\t\t\t__mirror_rxflow_instance(pss-\u003emi, 0);\n\t\t\tgoto req_writable;\n\t\t}\n\n\t\tif (pss-\u003emi-\u003erx_enabled \u0026\u0026\n\t\t lws_ring_get_count_free_elements(pss-\u003emi-\u003ering) \u003c\n\t\t\t\t\t\t\t\t RXFLOW_MIN)\n\t\t\t__mirror_rxflow_instance(pss-\u003emi, 0);\n\nreq_writable:\n\t\t__mirror_callback_all_in_mi_on_writable(pss-\u003emi);\n\ndone2:\n\t\tlws_pthread_mutex_unlock(\u0026pss-\u003emi-\u003elock); /* } mi lock */\n\t\tbreak;\n\n\tcase LWS_CALLBACK_EVENT_WAIT_CANCELLED:\n\t\tlwsl_info(\u0022LWS_CALLBACK_EVENT_WAIT_CANCELLED\u005cn\u0022);\n\t\tbreak;\n\n\tdefault:\n\t\tbreak;\n\t}\n\n\treturn 0;\n}\n\n#define LWS_PLUGIN_PROTOCOL_MIRROR { \u005c\n\t\t\u0022lws-mirror-protocol\u0022, \u005c\n\t\tcallback_lws_mirror, \u005c\n\t\tsizeof(struct per_session_data__lws_mirror), \u005c\n\t\t4096, /* rx buf size must be \u003e\u003d permessage-deflate rx size */ \u005c\n\t\t0, NULL, 0 \u005c\n\t}\n\n#if !defined (LWS_PLUGIN_STATIC)\n\nLWS_VISIBLE const struct lws_protocols lws_mirror_protocols[] \u003d {\n\tLWS_PLUGIN_PROTOCOL_MIRROR\n};\n\nLWS_VISIBLE const lws_plugin_protocol_t lws_mirror \u003d {\n\t.hdr \u003d {\n\t\t\u0022lws mirror\u0022,\n\t\t\u0022lws_protocol_plugin\u0022,\n\t\tLWS_BUILD_HASH,\n\t\tLWS_PLUGIN_API_MAGIC\n\t},\n\n\t.protocols \u003d lws_mirror_protocols,\n\t.count_protocols \u003d LWS_ARRAY_SIZE(lws_mirror_protocols),\n\t.extensions \u003d NULL,\n\t.count_extensions \u003d 0,\n};\n\n#endif\n","s":{"c":1711625320,"u": 667}}
],"g": 2738,"chitpc": 0,"ehitpc": 0,"indexed":0
,
"ab": 1, "si": 0, "db":0, "di":0, "sat":0, "lfc": "0000"}