root/erlang.c

Revision 382:0c864e40fa03, 14.2 KB (checked in by roberto@…, 3 months ago)

uwsgi_log everywhere

Line 
1#ifdef UWSGI_ERLANG
2
3#include "uwsgi.h"
4
5extern struct uwsgi_server uwsgi;
6
7static void erlang_log(void);
8
9PyObject *py_erlang_connect(PyObject * self, PyObject * args) {
10
11        char *erlang_node;
12        int fd;
13
14
15        if (!PyArg_ParseTuple(args, "s:erlang_connect", &erlang_node)) {
16                return NULL;
17        }
18
19        UWSGI_SET_BLOCKING;
20        fd = erl_connect(erlang_node);
21        UWSGI_UNSET_BLOCKING;
22        return PyInt_FromLong(fd);
23}
24
25PyObject *py_erlang_recv_message(PyObject * self, PyObject * args) {
26        int erfd;
27        struct pollfd erpoll;
28        ErlMessage emsg;
29        PyObject *pyer = NULL;
30        unsigned char erlang_buffer[8192];
31        int eret;
32        int timeout = uwsgi.shared->options[UWSGI_OPTION_SOCKET_TIMEOUT];
33
34        if (!PyArg_ParseTuple(args, "i|i:erlang_recv_message", &erfd, &timeout)) {
35                return NULL;
36        }
37
38        if (erfd < 0) {
39                goto clear;
40        }
41
42        erpoll.fd = erfd;
43        erpoll.events = POLLIN;
44      cycle:
45        memset(&emsg, 0, sizeof(ErlMessage));
46        UWSGI_SET_BLOCKING;
47        if (timeout > 0) {
48                eret = poll(&erpoll, 1, timeout * 1000);
49                if (eret < 0) {
50                        uwsgi_error("poll()");
51                        goto clear;
52                }
53                else if (eret == 0) {
54                        goto clear;
55                }
56        }
57        if (erl_receive_msg(erfd, erlang_buffer, 8192, &emsg) == ERL_MSG) {
58                if (emsg.type == ERL_TICK) {
59                        goto cycle;
60                }
61                if (emsg.msg) {
62                        pyer = eterm_to_py(emsg.msg);
63                }
64                if (emsg.msg) {
65                        erl_free_compound(emsg.msg);
66                }
67                if (emsg.to) {
68                        erl_free_compound(emsg.to);
69                }
70                if (emsg.from) {
71                        erl_free_compound(emsg.from);
72                }
73                if (!pyer) {
74                        goto clear;
75                }
76                UWSGI_UNSET_BLOCKING;
77                return pyer;
78        }
79
80      clear:
81        UWSGI_UNSET_BLOCKING;
82        Py_INCREF(Py_None);
83        return Py_None;
84
85}
86
87PyObject *py_erlang_send_message(PyObject * self, PyObject * args) {
88
89        ETERM *pymessage;
90        ETERM *pid;
91        int erfd;
92        PyObject *ermessage, *erdest, *zero;
93
94        int er_number, er_serial, er_creation;
95        char *er_node;
96
97        if (!PyArg_ParseTuple(args, "iOO|i:erlang_send_message", &erfd, &erdest, &ermessage)) {
98                return NULL;
99        }
100
101        if (erfd < 0) {
102                goto clear;
103        }
104
105        if (!PyString_Check(erdest) && !PyDict_Check(erdest)) {
106                goto clear;
107        }
108
109        pymessage = py_to_eterm(ermessage);
110        if (!pymessage) {
111                goto clear;
112        }
113
114
115        if (PyString_Check(erdest)) {
116                if (!erl_reg_send(erfd, PyString_AsString(erdest), pymessage)) {
117                        erl_err_msg("erl_reg_send()");
118                        goto clear2;
119                }
120        }
121        else if (PyDict_Check(erdest)) {
122                zero = PyDict_GetItemString(erdest, "node");
123                if (!zero) {
124                        goto clear2;
125                }
126                if (!PyString_Check(zero)) {
127                        goto clear2;
128                }
129                er_node = PyString_AsString(zero);
130
131                zero = PyDict_GetItemString(erdest, "number");
132                if (!zero) {
133                        goto clear2;
134                }
135                if (!PyInt_Check(zero)) {
136                        goto clear2;
137                }
138                er_number = PyInt_AsLong(zero);
139
140                zero = PyDict_GetItemString(erdest, "serial");
141                if (!zero) {
142                        goto clear2;
143                }
144                if (!PyInt_Check(zero)) {
145                        goto clear2;
146                }
147                er_serial = PyInt_AsLong(zero);
148
149                zero = PyDict_GetItemString(erdest, "creation");
150                if (!zero) {
151                        goto clear2;
152                }
153                if (!PyInt_Check(zero)) {
154                        goto clear2;
155                }
156                er_creation = PyInt_AsLong(zero);
157
158                pid = erl_mk_pid((const char *) er_node, er_number, er_serial, er_creation);
159
160                if (!pid) {
161                        goto clear2;
162                }
163
164                if (!erl_send(erfd, pid, pymessage)) {
165                        erl_err_msg("erl_send()");
166                        erl_free_term(pid);
167                        goto clear2;
168                }
169
170                erl_free_term(pid);
171        }
172        else {
173                goto clear;
174        }
175
176        erl_free_compound(pymessage);
177
178        Py_INCREF(Py_True);
179        return Py_True;
180
181      clear2:
182        erl_free_compound(pymessage);
183      clear:
184        PyErr_Print();
185        Py_INCREF(Py_None);
186        return Py_None;
187}
188
189PyObject *py_erlang_rpc(PyObject * self, PyObject * args) {
190
191        int fd, timeout = uwsgi.shared->options[UWSGI_OPTION_SOCKET_TIMEOUT];
192        char *emod, *efun;
193        PyObject *eargs, *pyer = NULL;
194        ETERM *pyargs, *rex;
195        ErlMessage emsg;
196        int eret;
197
198        if (!PyArg_ParseTuple(args, "issO|i:erlang_rpc", &fd, &emod, &efun, &eargs, &timeout)) {
199                return NULL;
200        }
201
202        if (fd < 0) {
203                goto clear;
204        }
205
206        pyargs = py_to_eterm(eargs);
207        if (!pyargs) {
208                goto clear;
209        }
210
211
212        if (erl_rpc_to(fd, emod, efun, pyargs)) {
213                erl_err_msg("erl_rpc_to()");
214                goto clear2;
215        }
216
217
218      cycle:
219        memset(&emsg, 0, sizeof(ErlMessage));
220        UWSGI_SET_BLOCKING;
221        eret = erl_rpc_from(fd, timeout * 1000, &emsg);
222        if (eret == ERL_TICK) {
223                goto cycle;
224        }
225        else if (eret == ERL_MSG) {
226                if (emsg.msg) {
227                        if (ERL_IS_TUPLE(emsg.msg)) {
228                                rex = erl_element(1, emsg.msg);
229                                if (!rex) {
230                                        goto clear2;
231                                }
232                                if (!strncmp("rex", ERL_ATOM_PTR(rex), ERL_ATOM_SIZE(rex))) {
233                                        erl_free_term(rex);
234                                        rex = erl_element(2, emsg.msg);
235                                        if (!rex) {
236                                                goto clear2;
237                                        }
238                                        pyer = eterm_to_py(rex);
239                                        erl_free_term(rex);
240                                }
241                                else {
242                                        erl_free_term(rex);
243                                }
244                        }
245                }
246                if (emsg.msg) {
247                        erl_free_compound(emsg.msg);
248                }
249                if (emsg.to) {
250                        erl_free_compound(emsg.to);
251                }
252                if (emsg.from) {
253                        erl_free_compound(emsg.from);
254                }
255                if (!pyer) {
256                        goto clear2;
257                }
258                erl_free_compound(pyargs);
259                UWSGI_UNSET_BLOCKING;
260                return pyer;
261        }
262        else {
263                erl_err_msg("erl_rpc_from()");
264        }
265
266      clear2:
267        UWSGI_UNSET_BLOCKING;
268        erl_free_compound(pyargs);
269
270      clear:
271        Py_INCREF(Py_None);
272        return Py_None;
273}
274
275PyObject *py_erlang_close(PyObject * self, PyObject * args) {
276
277        int fd;
278
279        if (!PyArg_ParseTuple(args, "i:erlang_close", &fd)) {
280                return NULL;
281        }
282
283
284        if (fd >= 0) {
285                erl_close_connection(fd);
286        }
287
288        Py_INCREF(Py_True);
289        return Py_True;
290}
291
292static PyMethodDef uwsgi_erlang_methods[] = {
293        {"erlang_connect", py_erlang_connect, METH_VARARGS, ""},
294        {"erlang_send_message", py_erlang_send_message, METH_VARARGS, ""},
295        {"erlang_recv_message", py_erlang_recv_message, METH_VARARGS, ""},
296        {"erlang_rpc", py_erlang_rpc, METH_VARARGS, ""},
297        {"erlang_close", py_erlang_close, METH_VARARGS, ""},
298        {NULL, NULL},
299};
300
301
302int init_erlang(char *nodename, char *cookie) {
303
304        struct sockaddr_in e_addr;
305
306        char *ip;
307        char *node;
308        int efd;
309        int rlen;
310        char *cookiefile;
311        char *cookiehome;
312        char cookievalue[128];
313        int cookiefd;
314
315        PyMethodDef *uwsgi_function;
316
317
318        ip = strchr(nodename, '@');
319
320        if (ip == NULL) {
321                uwsgi_log( "*** invalid erlang node name ***\n");
322                return -1;
323        }
324
325        if (cookie == NULL) {
326                // get the cookie from the home
327                cookiehome = getenv("HOME");
328                if (!cookiehome) {
329                        uwsgi_log( "unable to get erlang cookie from your home.\n");
330                        return -1;
331                }
332                cookiefile = malloc(strlen(cookiehome) + 1 + strlen(".erlang.cookie") + 1);
333                if (!cookiefile) {
334                        uwsgi_error("malloc()");
335                }
336                cookiefile[0] = 0;
337                strcat(cookiefile, cookiehome);
338                strcat(cookiefile, "/.erlang.cookie");
339
340                cookiefd = open(cookiefile, O_RDONLY);
341                if (cookiefd < 0) {
342                        uwsgi_error("open()");
343                        free(cookiefile);
344                        return -1;
345                }
346
347                memset(cookievalue, 0, 128);
348                if (read(cookiefd, cookievalue, 127) < 1) {
349                        uwsgi_log( "invalid cookie found in %s\n", cookiefile);
350                        close(cookiefd);
351                        free(cookiefile);
352                        return -1;
353                }
354                cookie = cookievalue;
355                close(cookiefd);
356                free(cookiefile);
357        }
358
359        node = malloc((ip - nodename) + 1);
360        if (node == NULL) {
361                uwsgi_error("malloc()");
362                return -1;
363        }
364        memset(node, 0, (ip - nodename) + 1);
365        memcpy(node, nodename, ip - nodename);
366
367        erl_init(NULL, 0);
368
369        if (erl_connect_xinit(ip + 1, node, nodename, NULL, cookie, 0) == -1) {
370                uwsgi_log( "*** unable to initialize erlang c-node ***\n");
371                return -1;
372        }
373
374        efd = socket(AF_INET, SOCK_STREAM, 0);
375        if (efd < 0) {
376                uwsgi_error("socket()");
377                return -1;
378        }
379
380
381        memset(&e_addr, 0, sizeof(struct sockaddr_in));
382        e_addr.sin_family = AF_INET;
383        e_addr.sin_addr.s_addr = inet_addr(ip + 1);
384
385        rlen = 1;
386        if (setsockopt(efd, SOL_SOCKET, SO_REUSEADDR, &rlen, sizeof(rlen))) {
387                uwsgi_error("setsockopt()");
388                close(efd);
389                return -1;
390        }
391
392        if (bind(efd, (struct sockaddr *) &e_addr, sizeof(struct sockaddr_in)) < 0) {
393                uwsgi_error("bind()");
394                close(efd);
395                return -1;
396        }
397
398        rlen = sizeof(struct sockaddr_in);
399        if (getsockname(efd, (struct sockaddr *) &e_addr, (socklen_t *) & rlen)) {
400                uwsgi_error("getsockname()");
401                close(efd);
402                return -1;
403        }
404
405        if (listen(efd, uwsgi.listen_queue)) {
406                uwsgi_error("listen()");
407                close(efd);
408                return -1;
409        }
410
411        if (erl_publish(ntohs(e_addr.sin_port)) < 0) {
412                uwsgi_log( "*** unable to subscribe with EPMD ***\n");
413                close(efd);
414                return -1;
415        }
416
417        uwsgi_log( "Erlang C-Node initialized on port %d you can access it with name %s\n", ntohs(e_addr.sin_port), nodename);
418
419        for (uwsgi_function = uwsgi_erlang_methods; uwsgi_function->ml_name != NULL; uwsgi_function++) {
420                PyObject *func = PyCFunction_New(uwsgi_function, NULL);
421                PyDict_SetItemString(uwsgi.embedded_dict, uwsgi_function->ml_name, func);
422                Py_DECREF(func);
423        }
424
425        return efd;
426
427}
428
429ETERM *py_to_eterm(PyObject * pobj) {
430        int i;
431        int count;
432        PyObject *pobj2;
433        ETERM *eobj = NULL;
434        ETERM *eobj2 = NULL;
435        ETERM **eobj3;
436
437        if (pobj == NULL) {
438                return erl_mk_empty_list();
439        }
440
441        if (PyString_Check(pobj)) {
442                eobj = erl_mk_atom(PyString_AsString(pobj));
443        }
444        else if (PyInt_Check(pobj)) {
445                eobj = erl_mk_int(PyInt_AsLong(pobj));
446        }
447        else if (PyList_Check(pobj)) {
448                eobj = erl_mk_empty_list();
449                for (i = PyList_Size(pobj) - 1; i >= 0; i--) {
450                        pobj2 = PyList_GetItem(pobj, i);
451                        eobj2 = py_to_eterm(pobj2);
452                        eobj = erl_cons(eobj2, eobj);
453                }
454        }
455        else if (PyDict_Check(pobj)) {
456                // a pid
457                char *er_node;
458                int er_number, er_serial, er_creation;
459                pobj2 = PyDict_GetItemString(pobj, "node");
460                if (!pobj2) {
461                        PyErr_Print();
462                        goto clear;
463                }
464                if (!PyString_Check(pobj2)) {
465                        goto clear;
466                }
467                er_node = PyString_AsString(pobj2);
468
469                pobj2 = PyDict_GetItemString(pobj, "number");
470                if (!pobj2) {
471                        PyErr_Print();
472                        goto clear;
473                }
474                if (!PyInt_Check(pobj2)) {
475                        goto clear;
476                }
477                er_number = PyInt_AsLong(pobj2);
478
479                pobj2 = PyDict_GetItemString(pobj, "serial");
480                if (!pobj2) {
481                        PyErr_Print();
482                        goto clear;
483                }
484                if (!PyInt_Check(pobj2)) {
485                        goto clear;
486                }
487                er_serial = PyInt_AsLong(pobj2);
488
489                pobj2 = PyDict_GetItemString(pobj, "creation");
490                if (!pobj2) {
491                        PyErr_Print();
492                        goto clear;
493                }
494                if (!PyInt_Check(pobj2)) {
495                        goto clear;
496                }
497                er_creation = PyInt_AsLong(pobj2);
498
499                eobj = erl_mk_pid(er_node, er_number, er_serial, er_creation);
500        }
501        else if (PyTuple_Check(pobj)) {
502                count = PyTuple_Size(pobj);
503                eobj3 = malloc(sizeof(ETERM *) * count);
504                for (i = 0; i < count; i++) {
505                        pobj2 = PyTuple_GetItem(pobj, i);
506                        if (!pobj2) {
507                                break;
508                        }
509                        eobj3[i] = py_to_eterm(pobj2);
510                }
511                eobj = erl_mk_tuple(eobj3, count);
512                free(eobj3);
513        }
514        else {
515                uwsgi_log( "UNMANAGED PYTHON TYPE: %s\n", pobj->ob_type->tp_name);
516        }
517
518      clear:
519        if (eobj == NULL) {
520                return erl_mk_empty_list();
521        }
522
523        return eobj;
524}
525
526PyObject *eterm_to_py(ETERM * obj) {
527        int i;
528        int count;
529        ETERM *obj2;
530        PyObject *eobj = NULL;
531
532        if (obj == NULL) {
533                Py_INCREF(Py_None);
534                return Py_None;
535        }
536
537        switch (ERL_TYPE(obj)) {
538
539        case ERL_CONS:
540        case ERL_NIL:
541                count = erl_length(obj);
542                eobj = PyList_New(0);
543                for (i = 0; i < count; i++) {
544                        obj2 = erl_hd(obj);
545                        PyList_Append(eobj, eterm_to_py(obj2));
546                        obj = erl_tl(obj);
547                }
548                break;
549        case ERL_TUPLE:
550                eobj = PyTuple_New(erl_size(obj));
551                for (i = 1; i <= erl_size(obj); i++) {
552                        obj2 = erl_element(i, obj);
553                        PyTuple_SetItem(eobj, i - 1, eterm_to_py(obj2));
554                }
555                break;
556        case ERL_ATOM:
557                eobj = PyString_FromStringAndSize(ERL_ATOM_PTR(obj), ERL_ATOM_SIZE(obj));
558                break;
559        case ERL_INTEGER:
560                eobj = PyInt_FromLong(ERL_INT_VALUE(obj));
561                break;
562        case ERL_BINARY:
563                uwsgi_log( "FOUND A BINARY %.*s\n", ERL_BIN_SIZE(obj), ERL_BIN_PTR(obj));
564                break;
565        case ERL_PID:
566                eobj = PyDict_New();
567                if (PyDict_SetItemString(eobj, "node", PyString_FromString(ERL_PID_NODE(obj)))) {
568                        PyErr_Print();
569                        break;
570                }
571                if (PyDict_SetItemString(eobj, "number", PyInt_FromLong(ERL_PID_NUMBER(obj)))) {
572                        PyErr_Print();
573                        break;
574                }
575                if (PyDict_SetItemString(eobj, "serial", PyInt_FromLong(ERL_PID_SERIAL(obj)))) {
576                        PyErr_Print();
577                        break;
578                }
579                if (PyDict_SetItemString(eobj, "creation", PyInt_FromLong(ERL_PID_CREATION(obj)))) {
580                        PyErr_Print();
581                        break;
582                }
583        default:
584                uwsgi_log( "UNMANAGED ETERM TYPE: %d\n", ERL_TYPE(obj));
585                break;
586
587        }
588
589        if (eobj == NULL) {
590                Py_INCREF(Py_None);
591                return Py_None;
592        }
593
594        return eobj;
595}
596
597void erlang_loop(struct wsgi_request *wsgi_req) {
598
599        ErlConnect econn;
600        ErlMessage em;
601        ETERM *eresponse;
602
603        PyObject *callable = PyDict_GetItemString(uwsgi.embedded_dict, "erlang_func");
604        if (!callable) {
605                PyErr_Print();
606                uwsgi_log( "- you have not defined a uwsgi.erlang_func callable, Erlang message manager will be disabled until you define it -\n");
607        }
608
609        PyObject *pargs = PyTuple_New(1);
610        if (!pargs) {
611                PyErr_Print();
612                uwsgi_log( "- error preparing arg tuple for uwsgi.erlang_func callable, Erlang message manager will be disabled -\n");
613        }
614
615        while (uwsgi.workers[uwsgi.mywid].manage_next_request) {
616
617
618                UWSGI_CLEAR_STATUS;
619
620                wsgi_req->poll.fd = erl_accept(uwsgi.erlangfd, &econn);
621
622                if (wsgi_req->poll.fd >= 0) {
623
624                        UWSGI_SET_ERLANGING;
625                        for (;;) {
626                                if (erl_receive_msg(wsgi_req->poll.fd, (unsigned char *) wsgi_req->buffer, uwsgi.buffer_size, &em) == ERL_MSG) {
627                                        if (em.type == ERL_TICK)
628                                                continue;
629
630                                        if (!callable) {
631                                                callable = PyDict_GetItemString(uwsgi.embedded_dict, "erlang_func");
632                                        }
633
634                                        if (!callable) {
635                                                uwsgi_log( "- you still have not defined a uwsgi.erlang_func callable, Erlang message rejected -\n");
636                                        }
637
638                                        PyObject *zero = eterm_to_py(em.msg);
639                                        if (em.msg) {
640                                                erl_free_compound(em.msg);
641                                        }
642                                        if (em.to) {
643                                                erl_free_compound(em.to);
644                                        }
645
646                                        if (!zero) {
647                                                PyErr_Print();
648                                                continue;
649                                        }
650
651                                        if (PyTuple_SetItem(pargs, 0, zero)) {
652                                                PyErr_Print();
653                                                continue;
654                                        }
655
656                                        PyObject *erlang_result = PyEval_CallObject(callable, pargs);
657
658                                        //Py_DECREF(zero);
659
660                                        if (erlang_result) {
661                                                eresponse = py_to_eterm(erlang_result);
662                                                if (eresponse) {
663                                                        erl_send(wsgi_req->poll.fd, em.from, eresponse);
664                                                        erl_free_compound(eresponse);
665                                                }
666                                                Py_DECREF(erlang_result);
667                                        }
668
669                                        if (em.from) {
670                                                erl_free_compound(em.from);
671                                        }
672
673                                        uwsgi.workers[0].requests++;
674                                        uwsgi.workers[uwsgi.mywid].requests++;
675                                        if (uwsgi.shared->options[UWSGI_OPTION_LOGGING])
676                                                erlang_log();
677                                }
678                                else {
679                                        break;
680                                }
681                        }
682                        erl_close_connection(wsgi_req->poll.fd);
683
684                        UWSGI_UNSET_ERLANGING;
685                }
686        }
687}
688
689static void erlang_log() {
690        if (uwsgi.shared->options[UWSGI_OPTION_MEMORY_DEBUG]) {
691                get_memusage();
692        }
693        else {
694                uwsgi.workers[uwsgi.mywid].rss_size = 0;
695                uwsgi.workers[uwsgi.mywid].vsz_size = 0;
696        }
697        uwsgi_log( "[Erlang worker %d pid %d] request %llu done {rss: %llu vsz: %llu}\n", uwsgi.mywid, uwsgi.mypid, uwsgi.workers[uwsgi.mywid].requests, uwsgi.workers[uwsgi.mywid].rss_size, uwsgi.workers[uwsgi.mywid].vsz_size);
698}
699
700#else
701#warning "*** Erlang support is disabled ***"
702#endif
Note: See TracBrowser for help on using the browser.