From 2a69106e01835cc4a451e67568cf75e664042fd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:20:10 +0100 Subject: [PATCH 01/26] Update README --- README | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/README b/README index 7689df2..339c496 100644 --- a/README +++ b/README @@ -7,6 +7,20 @@ this library use part of libstomp code web of STOMP project: http://stomp.codehaus.org/ email: barlone@yandex.ru + +Compilation instructions for Ubuntu. +In order do compile the plugin, you need to install Apache Portable Runtime (APR) and MySQL client development packages. + +sudo apt-get install libapr1 libapr1-dev libmysqlclient-dev + +To install the UDF, just run the provided install script which will compile and install the library. +The MySQL root password will be requested for installing. + +sudo ./install.sh + +If you get errors of missing libraries, edit the Makefile and make sure the paths are correct for your system. + + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -19,15 +33,3 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - -Compile with (adapt the include and lib path to your environment): -> gcc -Wall -O2 -I/usr/local/include/apr-1 -> -I/usr/local/include/mysql \ - lib_mysqludf_stomp.c /usr/local/lib/libapr-1.so -shared \ - -o lib_mysqludf_stomp.so -> strip ./lib_mysqludf_stomp.so - -Add the functions to MySQL with: -mysql> CREATE FUNCTION stompsend RETURNS STRING SONAME "lib_mysqludf_stomp.so"; -mysql> CREATE FUNCTION stompsend1 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; -mysql> CREATE FUNCTION stompsend2 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; From fbf7a39edc83b3abc370c199a52741872b40da7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:21:36 +0100 Subject: [PATCH 02/26] Update README --- README | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README b/README index 339c496..c6800b6 100644 --- a/README +++ b/README @@ -8,7 +8,7 @@ web of STOMP project: http://stomp.codehaus.org/ email: barlone@yandex.ru -Compilation instructions for Ubuntu. +### Compilation instructions for Ubuntu. In order do compile the plugin, you need to install Apache Portable Runtime (APR) and MySQL client development packages. sudo apt-get install libapr1 libapr1-dev libmysqlclient-dev From 5c0eba8868a518566ce90f9ae54477cc7cac6294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:22:01 +0100 Subject: [PATCH 03/26] Rename README to README.md --- README => README.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename README => README.md (100%) diff --git a/README b/README.md similarity index 100% rename from README rename to README.md From b5551d5eab96b16e03fa73f86996d058c11daf36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:23:35 +0100 Subject: [PATCH 04/26] Update README.md --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c6800b6..42c6974 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,21 @@ lib_mysqludf_stomp - a library to send STOMP messages Copyright 2005 LogicBlaze Inc. + Copyright (C) 2011 Dmitry Demianov aka barlone this library use part of libstomp code - web of STOMP project: http://stomp.codehaus.org/ email: barlone@yandex.ru -### Compilation instructions for Ubuntu. +### Installing on Ubuntu. + +## Requirements In order do compile the plugin, you need to install Apache Portable Runtime (APR) and MySQL client development packages. sudo apt-get install libapr1 libapr1-dev libmysqlclient-dev +## Install To install the UDF, just run the provided install script which will compile and install the library. The MySQL root password will be requested for installing. @@ -20,7 +23,6 @@ sudo ./install.sh If you get errors of missing libraries, edit the Makefile and make sure the paths are correct for your system. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at From 1b47e0df02d60e2e5df9e73d9868e5446ec905d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:24:08 +0100 Subject: [PATCH 05/26] Update README.md --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 42c6974..d766642 100644 --- a/README.md +++ b/README.md @@ -8,20 +8,20 @@ web of STOMP project: http://stomp.codehaus.org/ email: barlone@yandex.ru -### Installing on Ubuntu. +# Installing on Ubuntu. -## Requirements +### Requirements In order do compile the plugin, you need to install Apache Portable Runtime (APR) and MySQL client development packages. sudo apt-get install libapr1 libapr1-dev libmysqlclient-dev -## Install +### Install To install the UDF, just run the provided install script which will compile and install the library. The MySQL root password will be requested for installing. sudo ./install.sh -If you get errors of missing libraries, edit the Makefile and make sure the paths are correct for your system. +#### If you get errors of missing libraries, edit the Makefile and make sure the paths are correct for your system. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 0bd3c8e1509f3d5fad79015d2aaff49a4fc40ee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:29:49 +0100 Subject: [PATCH 06/26] Update README.md --- README.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index d766642..b90fa51 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,28 @@ lib_mysqludf_stomp - a library to send STOMP messages + Copyright 2005 LogicBlaze Inc. Copyright (C) 2011 Dmitry Demianov aka barlone -this library use part of libstomp code -web of STOMP project: http://stomp.codehaus.org/ +this library use part of libstomp code + +web of STOMP project: http://stomp.codehaus.org + email: barlone@yandex.ru -# Installing on Ubuntu. +# Instructions for Ubuntu -### Requirements +### Install Dependencies In order do compile the plugin, you need to install Apache Portable Runtime (APR) and MySQL client development packages. -sudo apt-get install libapr1 libapr1-dev libmysqlclient-dev +`sudo apt-get install libapr1 libapr1-dev libmysqlclient-dev` -### Install +### Compile & Install To install the UDF, just run the provided install script which will compile and install the library. The MySQL root password will be requested for installing. -sudo ./install.sh +`sudo ./install.sh` #### If you get errors of missing libraries, edit the Makefile and make sure the paths are correct for your system. From cbb83132a006895049aa16555ce159f4d5967e00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:30:39 +0100 Subject: [PATCH 07/26] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b90fa51..d4c7b32 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -lib_mysqludf_stomp - a library to send STOMP messages +# lib_mysqludf_stomp - a library to send STOMP messages Copyright 2005 LogicBlaze Inc. @@ -11,7 +11,7 @@ web of STOMP project: http://stomp.codehaus.org email: barlone@yandex.ru -# Instructions for Ubuntu +## Instructions for Ubuntu ### Install Dependencies In order do compile the plugin, you need to install Apache Portable Runtime (APR) and MySQL client development packages. From f8f249fa6be2d2b7f7e2acf3ae2d6443c032ca2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:35:22 +0100 Subject: [PATCH 08/26] Update README.md --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d4c7b32..7fda23f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ -# lib_mysqludf_stomp - a library to send STOMP messages +# lib_mysqludf_stomp +## a library to send STOMP messages Copyright 2005 LogicBlaze Inc. From 81ee8054cf7e0b3c5345dffddc23d1384b327b95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Thu, 8 Sep 2016 22:35:41 +0100 Subject: [PATCH 09/26] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7fda23f..07b2006 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# lib_mysqludf_stomp -## a library to send STOMP messages +## lib_mysqludf_stomp +### a library to send STOMP messages Copyright 2005 LogicBlaze Inc. From c400ba7d9d08faabac63b5ed0bb1d6ed115c9653 Mon Sep 17 00:00:00 2001 From: ss Date: Thu, 8 Sep 2016 23:00:21 +0100 Subject: [PATCH 10/26] Update compilation process Modified Makefile and added install.sh script --- Makefile | 8 ++++++++ install.sh | 24 ++++++++++++++++++++++++ lib_mysqludf_stomp.so | Bin 13576 -> 0 bytes lib_mysqludf_stomp.sql | 27 +++++++++++++++++++++++++++ 4 files changed, 59 insertions(+) create mode 100644 Makefile create mode 100755 install.sh delete mode 100644 lib_mysqludf_stomp.so create mode 100644 lib_mysqludf_stomp.sql diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..dee5239 --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +MYSQL_DEV_DIR=/usr/include/mysql +APR_DEV_DIR=/usr/include/apr-1.0/ +MYSQL_LIB_DIR=/usr/lib/mysql/plugin +APR_LIB_DIR=/usr/lib/x86_64-linux-gnu + +install: + gcc -Wall -O2 -I$(APR_DEV_DIR) -I$(MYSQL_DEV_DIR) lib_mysqludf_stomp.c /$(APR_LIB_DIR)/libapr-1.so -shared -o $(MYSQL_LIB_DIR)/lib_mysqludf_stomp.so -fPIC + diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..0c7a248 --- /dev/null +++ b/install.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +echo "Compiling the MySQL UDF" +make + +if test $? -ne 0; then + echo "ERROR: You need libmysqlclient and Apache Portable Runtime development software installed " + echo "to be able to compile this UDF, on Debian/Ubuntu just run:" + echo "apt-get install libapr1 libapr1-dev libmysqlclient-dev" + exit 1 +else + echo "MySQL UDF compiled successfully" +fi + +echo -e "\nPlease provide your MySQL root password to install the UDF..." + +mysql -u root -p mysql < lib_mysqludf_stomp.sql + +if test $? -ne 0; then + echo "ERROR: unable to install the UDF" + exit 1 +else + echo "MySQL UDF installed successfully" +fi diff --git a/lib_mysqludf_stomp.so b/lib_mysqludf_stomp.so deleted file mode 100644 index 35a2f5b7c0ebd782ac5ef54effc4020152fc77b6..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 13576 zcmeHOe{@t;et(&aFv7@;IAXNf^`Swb%19!#VCfniC!l6A3k+)yX2(fp0#hcLF!P25 z7glhbEzftROQomXqo=JLq~)Aymt8p$m91fenYBIbQf0d+o}=yJHd!J@MV7Yc^z*sz z-pMct5!d~zCztch=idAM-tWEl`~BV@Z{FkKU~^MdRh6bvwN|aMq+h0K^MPmQ*_`>B zu1(TrYu9L33%-&kdS}{ni704-f6lLI=e}Z-7Yb`ZlO*54Hn)Iox9J$lB=Vh-P6HLy zAx~1guf1zTQ5=<2`{m%5_wPjCg3tn4p~nZT1}#|@(zbqe+N?z{y}xm1G`jSsi@)Vu zAItrC7I-&-eiy{?QvOO1({#}9fIOfnpvrU|Dy|1zE*Wkh$Nyze4T$q#nqh->Dey}& zeTF|wi$t+wT3HyV`d0hkIT$!fYQ2KiVa^6zO&djbTvv{?vn+i39=@*Z04qv`}!;a2x zFZghC_KHC8XG2f*KKsj$k)IUs|6%CK$39+j%^SDv{`JmPB;}zw@73IH18)+-U|6!g!OG9>i>2H|5QbJ zQbl=YMf<<0;D;;9->n#rihowq*NzH(jM1e!{vb`qsyG5Xf*@=WWw*8j^7{}tD}d8A zxAp)95rkZ;EO(>-Bj_X#Wcel3ALL?0nvRv*jdt!vQN()qptu?3nzC{iSbCYhMJT_6 z#i{f!hP;lA;7?@#@d|ynq2I0OUyXML_zz+OI3VR8hQ3DV=LW;^9YWR%M;w&n{Q>zf zLP3^4LRm-W_e* A49z>LUJ8%Atj!0!u=q^tEvuoz>blS?7(%u{s@E zUUu_eDDfSaZbpB+kP{faS*}4odW4 zt+T#W;*(hGEWaZ0rxM?jco_~x@_iB)Nqk)54<)`J@jmP|l;0#VD{+^^LlS=?@jZ#N zaCu<6pu}d0uVe3I{U;J9>Sk3wsmiNp!+4 zEboNf9mubPor(XAJ&)*s9~`(O9AiB&4*L@yfqjWC*x!NudjS!Bydu#>KFWtArlQweC=dQZiBCxUH;EretffB41trEL{weif>@N~wxTTcGMdtt!VZE1f znEUfY)SV(?j(6j7=fK4=OGMjUL|oEdCgNgsl!(jRheX(7F5-pgCW4ko>>(+$PPINz|mH3>$~|@Om*k!VzHQe&A4O$%CvzW@y~3`_5Rhb_7$JwEws3sG0s%n z&C9@AW(VIvv*K>r1e&re#{K>^XMUYfQ7U zaHD2A>jrvIV`VWRE&rSk`TR~V{?7ULL!Uabr>YhWr}g~vY@O|`GKE<2&$E=U+bkMR z{jxtx^;Ox!-qqh|?c*E={HWO0cdzK)XYH(kJf?W1l|9Q`uQ4?*>P%l<@ksWtZ*^;* zRa@7$g9+`kL#|!EMhq|LdUcQ^wnvbE8Cp5NFV3HB1>I)rX{*(1X3m;h&Se))_T=7# zcB{o{HRR5Ca{t1@JfW*gm> zEBC6=0KG84P1Z)JG3u;ts4%`{Ek$!_Uk2Y}YmI zg&QGe_M30_@QnWG>A{uYbnM9vI8JVTdf)Yf-6)Fq8SVc|Jh>mD4gEibKHbB@Lr(hi z31(9+`t-ZZ-s7ggqs%V%=3dKep_dohes$V@rO)A4xA5ymFSU&C*ZSYGU+eYJeqHbz z`ZXv+k=`i_#d^IRp_9xcq6i&3_r%xn^CIFquLyWpvHYCTT3*W*;H(Zx0Q6k3eE z-VptmB$itYlf-i4`oMAn)L^**YO&nF)3Mw(^#|%SdpFB(xRAaSgxy$fQ0v8VyOrw~ z&jrwj9_;JL+3$NK6CD|EE# zUvZ)w%N}05`WsSp^UV3SuuxK4XJS`Tww#(he6wgof1zG%w%Bcs>DWeU z?Pj^&%o|q7ZQbcaq&)f^h(LoAdl5{a_9C<2^XQ8xaz6RciG1)Kb0tQMeT-Tus0Lwe zIbklAleKz8X!g7bDNzL#0?g zJd`}!pT=mdptmwQIO%-wEoFHQ5*H#`&G}}+vvxF#SgiYJH7K=XDyKw;qJhgy6}Wx5;{`U-3XS$b*l3@NjrK9gUtDnH zvS2iL4g|gVZ@Dp1AN25tY!TU%U_A2PvzTkZsn&q!ytBMx2U-f}bC}A;R>18!&^X(U zR@E}ouh<+gM>H~VEE?`F&c-`ErS8w;J_eN{I!d?m`SUtHFh z0k5(#+bt@!Lo?3Pl@B<}1$Y6qPzBBTfUBJIsR&k=dClNy{e)|I@SVDPSIhcSqiFt*q#^1;%6 z#j!SHYT|aA-*U7NJo>2)S-G7Rf^TvBg`l|Go55rDEtvZRv|8uui7Gu%@~bi>R%jnu zvU{lIXfSoIE9d(V5>B^*2UR`vx~)(TcPk5x)V3Nte7->MD1rT@LB2KKAg?MLDzljt^cC$f0F|T;sfTey=|B7$@ORCabynL z*W@woCiq7SspL8yNU@Z-)`n{O9aYe8YkyvhRouf%`Y)(mT+nNx zdf+~2U$!h5fcBMo2Tu07NM{UNYE8dZHP{Ge*iSR|NqX^mR|Ki|E}chs3L1IYu*7%= zPa<;5??WPfn@7bjVixg>m__^|W)Z&^8?&>O8^2)8HYz=%Vz%cqWA-yc^5DgVq_1oY zzr~PjmLa*L!moBbj(JzU_;_p{KOV=_esS>_7%v{j)jLi+p1NQ>;)0hc?Wc%A#N)-r zpj-Lv3&tSd>xwOFR1EfBL=38}dE8y^^0F?zc!*VF*9&mhYc`LS$H652K2U$W&?J5w zFpm|QTu~iQ0ZsN(K$HCx(B$LK@r9-uYw!Sy7#U&)m6AwQ$8CU0@V^-eVvZtKIBi1A^GN;w=RAs-# zJ%y$Ny+g&~AK`bm(z}KR7?jUce4ygfl>MeiJkgFRBi^pyiH8z=)=6k1FDtN`O1j37 zo1(!{2A=xn;vLZ%W9@AkOmLUR*IRfqX2g?S+Ff_AfVpYwv0vl!xSB{47MQTWgasxn zFkyiS3rtvG!U7W(n6SWv1tu);|G)xxnk{k~3Yo`m4cgc6`zVp$n(;LNzZu2nMB-ai zzA>l)6A#YG2f@FDeEi|>`)<6Em;(Gu)Z=p=@tqo9Qk=dW-?hjx-$dY39`SwmGtkTY zTllpfpAl(CAdF8{#5Y(ofJ}HkPd)=a#+c-ohJh>VQ7BwIjG;@H=BO(QL8_d z6rUjaWBAUKzt?L1b|aZeqXQfMxE+e1B+?mgM+ZqX#2>%VX1Bz*i)fVWra>QbT{>F+^Gj{Dqc$oWcAwhZO=m&tLjalvV< vS4rjh-k<$*eC|dN Date: Thu, 8 Sep 2016 23:01:31 +0100 Subject: [PATCH 11/26] Removed compile instructions from header --- lib_mysqludf_stomp.c | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index c8a6992..9f1839d 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -19,17 +19,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - - Compile with (adapt the include and lib path to your environment): - > gcc -Wall -O2 -I/usr/local/include/apr-1 -I/usr/local/include/mysql \ - lib_mysqludf_stomp.c /usr/local/lib/libapr-1.so -shared \ - -o lib_mysqludf_stomp.so - > strip ./lib_mysqludf_stomp.so - - Add the functions to MySQL with: - mysql> CREATE FUNCTION stompsend RETURNS STRING SONAME "lib_mysqludf_stomp.so"; - mysql> CREATE FUNCTION stompsend1 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; - mysql> CREATE FUNCTION stompsend2 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; */ #if defined(_WIN32) || defined(_WIN64) || defined(__WIN32__) || defined(WIN32) From 9af94f1336eb93932dd6fda52ee491765526f414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Fri, 9 Sep 2016 00:53:58 +0100 Subject: [PATCH 12/26] Update with function usage examples --- README.md | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 07b2006..928e344 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -## lib_mysqludf_stomp -### a library to send STOMP messages +## lib_mysqludf_stomp - a library to send STOMP messages Copyright 2005 LogicBlaze Inc. @@ -11,6 +10,8 @@ web of STOMP project: http://stomp.codehaus.org email: barlone@yandex.ru +Authentication support on stompsend2 added by hugorosario + ## Instructions for Ubuntu @@ -27,6 +28,42 @@ The MySQL root password will be requested for installing. #### If you get errors of missing libraries, edit the Makefile and make sure the paths are correct for your system. +### Using the functions + +Installing will provide you with 3 new functions you can use in your queries. +All of them take the same first 3 parameters. +The difference between them are just the headers you can set. +All parameters are strings : + +- stompsend(Hostname, Topic, Message); +- stompsend1(Hostname, Topic, Message, HeaderName, HeaderValue); +- stompsend2(Hostname, Topic, Message, Header1Name, Header1Value, Header2Name, Header2Value); + +## Example +To send the message "Hello broker" to the "Welcome" topic on server "127.0.0.1", just use : + +`SELECT stompsend("127.0.0.1","Welcome", "Hello broker");` + +If everything went well, you should get and "OK" response, else you will get a NULL: + +``` ++--------------------------------------------------+ +| stompsend("127.0.0.1","Welcome", "Hello broker") | ++--------------------------------------------------+ +| OK | ++--------------------------------------------------+ +1 row in set (0.00 sec) +``` + +If you need to authenticate, you can use the `stompsend2` function with the "user" and "passcode" as headers. +This function was modified in this fork to allow authentication by sending the headers on the CONNECT frame. + +`SELECT stompsend2("127.0.0.1","Welcome", "Hello broker", "login","guest","passcode","mypass");` + +#### Beware, if the credentials are invalid you will still receive an "OK" from the UDF, so you should NOT rely on that to verify the message was sent. + + + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at From 49713901ac77a57169652bef059f095e87a41243 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Fri, 9 Sep 2016 01:02:04 +0100 Subject: [PATCH 13/26] Update README.md --- README.md | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 928e344..c16bd49 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,7 @@ -## lib_mysqludf_stomp - a library to send STOMP messages - -Copyright 2005 LogicBlaze Inc. - -Copyright (C) 2011 Dmitry Demianov aka barlone - -this library use part of libstomp code - -web of STOMP project: http://stomp.codehaus.org - -email: barlone@yandex.ru - -Authentication support on stompsend2 added by hugorosario +## lib_mysqludf_stomp +A MySQL UDF library to send STOMP messages to a message broker. +Supports authentication using the stompsend2 function (see details below). ## Instructions for Ubuntu @@ -30,16 +20,16 @@ The MySQL root password will be requested for installing. ### Using the functions -Installing will provide you with 3 new functions you can use in your queries. -All of them take the same first 3 parameters. -The difference between them are just the headers you can set. +The plugin will provide you with 3 new functions you can use in your queries. +All of them take the same first 3 parameters (Hostname, Topic, Message) and the others are headers. +For authentication you must use stompsend2. All parameters are strings : - stompsend(Hostname, Topic, Message); - stompsend1(Hostname, Topic, Message, HeaderName, HeaderValue); - stompsend2(Hostname, Topic, Message, Header1Name, Header1Value, Header2Name, Header2Value); -## Example +### Example To send the message "Hello broker" to the "Welcome" topic on server "127.0.0.1", just use : `SELECT stompsend("127.0.0.1","Welcome", "Hello broker");` @@ -63,6 +53,21 @@ This function was modified in this fork to allow authentication by sending the h #### Beware, if the credentials are invalid you will still receive an "OK" from the UDF, so you should NOT rely on that to verify the message was sent. +## Credits + +Copyright 2005 LogicBlaze Inc. + +Copyright (C) 2011 Dmitry Demianov aka barlone + +this library use part of libstomp code + +web of STOMP project: http://stomp.codehaus.org + +email: barlone@yandex.ru + +Authentication support on stompsend2 added by hugorosario + +## Licence Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From ccf4596b3d97002af7fd9c697177178dc26504ce Mon Sep 17 00:00:00 2001 From: hugorosario Date: Fri, 9 Sep 2016 01:04:09 +0100 Subject: [PATCH 14/26] Changed stompsend2 to send headers on the CONNECT frame, this will allow for authentication. Changed the the install sql script to drop the functions before installing the new ones. --- lib_mysqludf_stomp.c | 2 ++ lib_mysqludf_stomp.sql | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index 9f1839d..8a4fa4d 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -564,6 +564,8 @@ char *hdr2val = args->args[6]; frame.command = "CONNECT"; frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); + apr_hash_set(frame.headers, hdr2name, APR_HASH_KEY_STRING, hdr2val); frame.body = NULL; frame.body_length = -1; if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { diff --git a/lib_mysqludf_stomp.sql b/lib_mysqludf_stomp.sql index 41df6d8..fd9805b 100644 --- a/lib_mysqludf_stomp.sql +++ b/lib_mysqludf_stomp.sql @@ -22,6 +22,10 @@ limitations under the License. */ +DROP FUNCTION IF EXISTS stompsend; +DROP FUNCTION IF EXISTS stompsend1; +DROP FUNCTION IF EXISTS stompsend2; + CREATE FUNCTION stompsend RETURNS STRING SONAME "lib_mysqludf_stomp.so"; CREATE FUNCTION stompsend1 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; CREATE FUNCTION stompsend2 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; From e4523afd724064442c2d8bde5c4b91b0a5efb0ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Fri, 9 Sep 2016 01:06:17 +0100 Subject: [PATCH 15/26] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c16bd49..028ac8b 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ The MySQL root password will be requested for installing. The plugin will provide you with 3 new functions you can use in your queries. All of them take the same first 3 parameters (Hostname, Topic, Message) and the others are headers. -For authentication you must use stompsend2. +For authentication you must use `stompsend2`. All parameters are strings : - stompsend(Hostname, Topic, Message); From 9efb0457168df75299b93054d484df716e2000a5 Mon Sep 17 00:00:00 2001 From: hugorosario Date: Fri, 9 Sep 2016 01:12:31 +0100 Subject: [PATCH 16/26] Update disclamer --- lib_mysqludf_stomp.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index 8a4fa4d..477d762 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -2,6 +2,8 @@ lib_mysqludf_stomp - a library to send STOMP messages Copyright 2005 LogicBlaze Inc. Copyright (C) 2011 Dmitry Demianov aka barlone + + Authentication support on stompsend2 added by hugorosario this library use part of libstomp code From b67d59829cf5547e81919ae09dff4f34d2588d1d Mon Sep 17 00:00:00 2001 From: hugorosario Date: Fri, 9 Sep 2016 14:01:05 +0100 Subject: [PATCH 17/26] Authentication validation --- lib_mysqludf_stomp.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index 477d762..a6abe11 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -58,6 +58,7 @@ typedef long long longlong; #ifdef HAVE_DLOPEN #define LIBVERSION "lib_mysqludf_stomp version 0.2.0" +#define BUFSIZE 4096 /****************************************************************************** ** function declarations @@ -575,7 +576,30 @@ char *hdr2val = args->args[6]; *null_value = 1; return NULL; } + + //// + + char buf[BUFSIZE]; + apr_socket_timeout_set(connection->socket, 15000); + while (1) { + apr_size_t len = sizeof(buf); + apr_status_t rc = apr_socket_recv(connection->socket, buf, &len); + if (rc == APR_EOF || len == 0) { + break; + } + } + apr_socket_timeout_set(connection->socket, 1 * APR_USEC_PER_SEC); + + if (!strstr(buf,"CONNECTED")){ + stomp_disconnect(&connection); + strcpy(error, "authentication failed"); + *null_value = 1; + return NULL; + } + + ////// + frame.command = "SEND"; frame.headers = apr_hash_make(pool); apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); From 859e911f3b03fa59272838b577134f1eb6e59b04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Fri, 9 Sep 2016 15:21:30 +0100 Subject: [PATCH 18/26] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 028ac8b..dba3208 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ If everything went well, you should get and "OK" response, else you will get a N 1 row in set (0.00 sec) ``` -If you need to authenticate, you can use the `stompsend2` function with the "user" and "passcode" as headers. +If you need to authenticate, you can use the `stompsend2` function with the "login" and "passcode" as headers. This function was modified in this fork to allow authentication by sending the headers on the CONNECT frame. `SELECT stompsend2("127.0.0.1","Welcome", "Hello broker", "login","guest","passcode","mypass");` From 5d2b8df4292991022075c0dfda5fa6b04bf84312 Mon Sep 17 00:00:00 2001 From: hugorosario Date: Sat, 10 Sep 2016 03:00:52 +0100 Subject: [PATCH 19/26] Improved authentication validation. Created method `stomp_read` to read the socket into a buffer. --- Makefile | 2 +- lib_mysqludf_stomp.c | 1274 +++++++++++++++++++++--------------------- 2 files changed, 645 insertions(+), 631 deletions(-) diff --git a/Makefile b/Makefile index dee5239..f816516 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ MYSQL_DEV_DIR=/usr/include/mysql APR_DEV_DIR=/usr/include/apr-1.0/ MYSQL_LIB_DIR=/usr/lib/mysql/plugin -APR_LIB_DIR=/usr/lib/x86_64-linux-gnu +APR_LIB_DIR=/usr/lib/i386-linux-gnu install: gcc -Wall -O2 -I$(APR_DEV_DIR) -I$(MYSQL_DEV_DIR) lib_mysqludf_stomp.c /$(APR_LIB_DIR)/libapr-1.so -shared -o $(MYSQL_LIB_DIR)/lib_mysqludf_stomp.so -fPIC diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index a6abe11..53028e1 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -1,630 +1,644 @@ -/* - lib_mysqludf_stomp - a library to send STOMP messages - Copyright 2005 LogicBlaze Inc. - Copyright (C) 2011 Dmitry Demianov aka barlone - - Authentication support on stompsend2 added by hugorosario - - this library use part of libstomp code - - web of STOMP project: http://stomp.codehaus.org/ - email: barlone@yandex.ru - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#if defined(_WIN32) || defined(_WIN64) || defined(__WIN32__) || defined(WIN32) -#define DLLEXP __declspec(dllexport) -#else -#define DLLEXP -#endif - -#ifdef STANDARD -/* STANDARD is defined, don't use any mysql functions */ -#include -#include - -#ifdef __WIN__ -typedef unsigned __int64 ulonglong; /* Microsofts 64 bit types */ -typedef __int64 longlong; -#else -typedef unsigned long long ulonglong; -typedef long long longlong; -#endif /*__WIN__*/ - -#else -#include -#include -#include -#endif -#include -#include -#include "apr.h" -#include "apr_strings.h" -#include "apr_general.h" -#include "apr_network_io.h" -#include "apr_hash.h" - -#ifdef HAVE_DLOPEN - -#define LIBVERSION "lib_mysqludf_stomp version 0.2.0" -#define BUFSIZE 4096 - -/****************************************************************************** -** function declarations -******************************************************************************/ -#ifdef __cplusplus -extern "C" { -#endif - -DLLEXP -my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message); -DLLEXP -void stompsend_deinit(UDF_INIT *initid); -DLLEXP -char *stompsend(UDF_INIT *initid, UDF_ARGS *args, char *result, - unsigned long *res_length, char *null_value, char *error); - -DLLEXP -my_bool stompsend1_init(UDF_INIT *initid, UDF_ARGS *args, char *message); -DLLEXP -void stompsend1_deinit(UDF_INIT *initid); -DLLEXP -char *stompsend1(UDF_INIT *initid, UDF_ARGS *args, char *result, - unsigned long *res_length, char *null_value, char *error); - -DLLEXP -my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message); -DLLEXP -void stompsend2_deinit(UDF_INIT *initid); -DLLEXP -char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, char *result, - unsigned long *res_length, char *null_value, char *error); - -typedef struct stomp_connection { - apr_socket_t *socket; - apr_sockaddr_t *local_sa; - char *local_ip; - apr_sockaddr_t *remote_sa; - char *remote_ip; -} stomp_connection; - -typedef struct stomp_frame { - char *command; - apr_hash_t *headers; - char *body; - int body_length; -} stomp_frame; - -static apr_pool_t *pool; - -#ifdef __cplusplus -} -#endif -// ------------------------------------------------------------------------------ - -// STOMP functions - -/****************************************************************************** - * - * Used to establish a connection - * - ********************************************************************************/ -APR_DECLARE(apr_status_t) stomp_connect(stomp_connection **connection_ref, const char *hostname, int port, apr_pool_t *pool) -{ -apr_status_t rc; -int socket_family; -stomp_connection *connection=NULL; - - // - // Allocate the connection and a memory pool for the connection. - // - connection = apr_pcalloc(pool, sizeof(stomp_connection)); - if( connection == NULL ) - return APR_ENOMEM; - -#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } - - // Look up the remote address - rc = apr_sockaddr_info_get(&connection->remote_sa, hostname, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool); - CHECK_SUCCESS; - - // Create the socket. - socket_family = connection->remote_sa->sa.sin.sin_family; - rc = apr_socket_create(&connection->socket, socket_family, SOCK_STREAM, APR_PROTO_TCP, pool); - CHECK_SUCCESS; - - // Set socket options. - rc = apr_socket_opt_set(connection->socket, APR_SO_NONBLOCK, 1); - CHECK_SUCCESS; - rc = apr_socket_timeout_set(connection->socket, 1 * APR_USEC_PER_SEC); - CHECK_SUCCESS; - - // Try connect - rc = apr_socket_connect(connection->socket, connection->remote_sa); - CHECK_SUCCESS; - - // Get the Socket Info - rc = apr_socket_addr_get(&connection->remote_sa, APR_REMOTE, connection->socket); - CHECK_SUCCESS; - rc = apr_sockaddr_ip_get(&connection->remote_ip, connection->remote_sa); - CHECK_SUCCESS; - rc = apr_socket_addr_get(&connection->local_sa, APR_LOCAL, connection->socket); - CHECK_SUCCESS; - rc = apr_sockaddr_ip_get(&connection->local_ip, connection->local_sa); - CHECK_SUCCESS; - - -#undef CHECK_SUCCESS - - *connection_ref = connection; - return rc; -} // stomp_connect - -APR_DECLARE(apr_status_t) stomp_disconnect(stomp_connection **connection_ref) -{ -apr_status_t result, rc; -stomp_connection *connection = *connection_ref; - - if( connection_ref == NULL || *connection_ref==NULL ) - return APR_EGENERAL; - - result = APR_SUCCESS; - rc = apr_socket_shutdown(connection->socket, APR_SHUTDOWN_WRITE); - if( result!=APR_SUCCESS ) - result = rc; - - if( connection->socket != NULL ) { - rc = apr_socket_close(connection->socket); - if( result!=APR_SUCCESS ) - result = rc; - connection->socket=NULL; - } - - *connection_ref=NULL; - return rc; -} // stomp_disconnect - - -/******************************************************************************** - * - * Wrappers around the apr_socket_send and apr_socket_recv calls so that they - * read/write their buffers fully. - * - ********************************************************************************/ -APR_DECLARE(apr_status_t) stomp_write_buffer(stomp_connection *connection, const char *data, apr_size_t size) -{ -apr_size_t remaining = size; - - size=0; - while( remaining>0 ) { - apr_size_t length = remaining; - apr_status_t rc = apr_socket_send(connection->socket, data, &length); - data+=length; - remaining -= length; - // size += length; - if( rc != APR_SUCCESS ) - return rc; - - } - return APR_SUCCESS; -} // stomp_write_buffer - - -/******************************************************************************** - * - * Handles reading and writing stomp_frames to and from the connection - * - ********************************************************************************/ -APR_DECLARE(apr_status_t) stomp_write(stomp_connection *connection, stomp_frame *frame, apr_pool_t* pool) { -apr_status_t rc; - -#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } - // Write the command. - rc = stomp_write_buffer(connection, frame->command, strlen(frame->command)); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, "\n", 1); - CHECK_SUCCESS; - - // Write the headers - if( frame->headers != NULL ) { - - apr_hash_index_t *i; - const void *key; - void *value; - for (i = apr_hash_first(NULL, frame->headers); i; i = apr_hash_next(i)) { - apr_hash_this(i, &key, NULL, &value); - - rc = stomp_write_buffer(connection, key, strlen(key)); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, ":", 1); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, value, strlen(value)); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, "\n", 1); - CHECK_SUCCESS; - } - - if(frame->body_length >= 0) { - apr_pool_t *length_pool; - char *length_string; - - apr_pool_create(&length_pool, pool); - rc = stomp_write_buffer(connection, "content-length:", 15); - CHECK_SUCCESS; - - length_string = apr_itoa(length_pool, frame->body_length); - rc = stomp_write_buffer(connection, length_string, strlen(length_string)); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, "\n", 1); - CHECK_SUCCESS; - - apr_pool_destroy(length_pool); - } - } - rc = stomp_write_buffer(connection, "\n", 1); - CHECK_SUCCESS; - - // Write the body. - if( frame->body != NULL ) { - int body_length = frame->body_length; - if(body_length < 0) - body_length = strlen(frame->body); - rc = stomp_write_buffer(connection, frame->body, body_length); - CHECK_SUCCESS; - } - rc = stomp_write_buffer(connection, "\0\n", 2); - CHECK_SUCCESS; - -#undef CHECK_SUCCESS - - return APR_SUCCESS; -} // stomp_write - - -my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message) -{ - - /* make sure user has provided exactly three string arguments */ - if (args->arg_count != 3 || (args->arg_type[0] != STRING_RESULT) - || (args->arg_type[1] != STRING_RESULT) - || (args->arg_type[2] != STRING_RESULT)){ - strcpy(message, "stompsend requires 3 string arguments"); - return 1; - } - - if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0)){ - strcpy(message, "stompsend arguments can not be empty"); - return 1; - } - - // init APR - if (apr_initialize() != APR_SUCCESS) { - strcpy(message, "stompsend could not initialize APR"); - return 2; - } - - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - strcpy(message, "stompsend could not allocate APR pool"); - return 3; - } - - initid->maybe_null=0; - - return 0; -} - -/****************************************************************************** -** purpose: deallocate memory allocated by str_translate_init(); this func -** is called once for each query which invokes str_translate(), -** it is called after all of the calls to str_translate() are done -** receives: pointer to UDF_INIT struct (the same which was used by -** str_translate_init() and str_translate()) -** returns: nothing -******************************************************************************/ -void stompsend_deinit(UDF_INIT *initid __attribute__((unused))) -{ - apr_pool_destroy(pool); -} -/******************************************************************************/ - -char *stompsend(UDF_INIT *initid, UDF_ARGS *args, - char *result, unsigned long *res_length, - char *null_value, char *error) -{ -stomp_connection *connection; -stomp_frame frame; -char *host = args->args[0]; -char *topic = args->args[1]; -char *message = args->args[2]; - - if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { - strcpy(error, "stompsend could not connect to broker"); - *null_value = 1; - return NULL; - } - - frame.command = "CONNECT"; - frame.headers = apr_hash_make(pool); - frame.body = NULL; - frame.body_length = -1; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend could not send CONNECT frame"); - *null_value = 1; - return NULL; - } - - frame.command = "SEND"; - frame.headers = apr_hash_make(pool); - apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); - frame.body_length = -1; - frame.body = message; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend could not send SEND frame"); - *null_value = 1; - return NULL; - } - - frame.command = "DISCONNECT"; - frame.headers = NULL; - frame.body_length = -1; - frame.body = NULL; - stomp_write(connection, &frame, pool); // ignore errors - - stomp_disconnect(&connection); - - *res_length = 2; - strcpy(result, "OK"); - - return result; -} // stompsend - - -my_bool stompsend1_init(UDF_INIT *initid, UDF_ARGS *args, char *message) -{ - - /* make sure user has provided exactly three string arguments */ - if (args->arg_count != 5 || (args->arg_type[0] != STRING_RESULT) - || (args->arg_type[1] != STRING_RESULT) - || (args->arg_type[2] != STRING_RESULT) - || (args->arg_type[3] != STRING_RESULT) - || (args->arg_type[4] != STRING_RESULT)){ - strcpy(message, "stompsend1 requires 5 string arguments"); - return 1; - } - - if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0) || - (args->lengths[3] == 0) || (args->lengths[4] == 0)){ - strcpy(message, "stompsend1 arguments can not be empty"); - return 1; - } - - // init APR - if (apr_initialize() != APR_SUCCESS) { - strcpy(message, "stompsend1 could not initialize APR"); - return 2; - } - - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - strcpy(message, "stompsend1 could not allocate APR pool"); - return 3; - } - - initid->maybe_null=0; - - return 0; -} // stompsend1_init - -/****************************************************************************** -** purpose: deallocate memory allocated by str_translate_init(); this func -** is called once for each query which invokes str_translate(), -** it is called after all of the calls to str_translate() are done -** receives: pointer to UDF_INIT struct (the same which was used by -** str_translate_init() and str_translate()) -** returns: nothing -******************************************************************************/ -void stompsend1_deinit(UDF_INIT *initid __attribute__((unused))) -{ - apr_pool_destroy(pool); -} // stompsend1_deinit -/******************************************************************************/ - -char *stompsend1(UDF_INIT *initid, UDF_ARGS *args, - char *result, unsigned long *res_length, - char *null_value, char *error) -{ -stomp_connection *connection; -stomp_frame frame; -char *host = args->args[0]; -char *topic = args->args[1]; -char *message = args->args[2]; -char *hdr1name = args->args[3]; -char *hdr1val = args->args[4]; - - if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { - strcpy(error, "stompsend1 could not connect to broker"); - *null_value = 1; - return NULL; - } - - frame.command = "CONNECT"; - frame.headers = apr_hash_make(pool); - frame.body = NULL; - frame.body_length = -1; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend1 could not send CONNECT frame"); - *null_value = 1; - return NULL; - } - - frame.command = "SEND"; - frame.headers = apr_hash_make(pool); - apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); - apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); - frame.body_length = -1; - frame.body = message; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend1 could not send SEND frame"); - *null_value = 1; - return NULL; - } - - frame.command = "DISCONNECT"; - frame.headers = NULL; - frame.body_length = -1; - frame.body = NULL; - stomp_write(connection, &frame, pool); // ignore errors - - stomp_disconnect(&connection); - - *res_length = 2; - strcpy(result, "OK"); - - return result; -} // stompsend1 - -my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message) -{ - - /* make sure user has provided exactly three string arguments */ - if (args->arg_count != 7 || (args->arg_type[0] != STRING_RESULT) - || (args->arg_type[1] != STRING_RESULT) - || (args->arg_type[2] != STRING_RESULT) - || (args->arg_type[3] != STRING_RESULT) - || (args->arg_type[4] != STRING_RESULT) - || (args->arg_type[5] != STRING_RESULT) - || (args->arg_type[6] != STRING_RESULT)){ - strcpy(message, "stompsend2 requires 7 string arguments"); - return 1; - } - - if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0) || - (args->lengths[3] == 0) || (args->lengths[4] == 0) || - (args->lengths[5] == 0) || (args->lengths[6] == 0)){ - strcpy(message, "stompsend2 arguments can not be empty"); - return 1; - } - - // init APR - if (apr_initialize() != APR_SUCCESS) { - strcpy(message, "stompsend2 could not initialize APR"); - return 2; - } - - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - strcpy(message, "stompsend2 could not allocate APR pool"); - return 3; - } - - initid->maybe_null=0; - - return 0; -} // stompsend1_init - -/****************************************************************************** -** purpose: deallocate memory allocated by str_translate_init(); this func -** is called once for each query which invokes str_translate(), -** it is called after all of the calls to str_translate() are done -** receives: pointer to UDF_INIT struct (the same which was used by -** str_translate_init() and str_translate()) -** returns: nothing -******************************************************************************/ -void stompsend2_deinit(UDF_INIT *initid __attribute__((unused))) -{ - apr_pool_destroy(pool); -} // stompsend2_deinit -/******************************************************************************/ - -char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, - char *result, unsigned long *res_length, - char *null_value, char *error) -{ -stomp_connection *connection; -stomp_frame frame; -char *host = args->args[0]; -char *topic = args->args[1]; -char *message = args->args[2]; -char *hdr1name = args->args[3]; -char *hdr1val = args->args[4]; -char *hdr2name = args->args[5]; -char *hdr2val = args->args[6]; - - if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { - strcpy(error, "stompsend2 could not connect to broker"); - *null_value = 1; - return NULL; - } - - frame.command = "CONNECT"; - frame.headers = apr_hash_make(pool); - apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); - apr_hash_set(frame.headers, hdr2name, APR_HASH_KEY_STRING, hdr2val); - frame.body = NULL; - frame.body_length = -1; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend2 could not send CONNECT frame"); - *null_value = 1; - return NULL; - } - - //// - - - char buf[BUFSIZE]; - apr_socket_timeout_set(connection->socket, 15000); - while (1) { - apr_size_t len = sizeof(buf); - apr_status_t rc = apr_socket_recv(connection->socket, buf, &len); - if (rc == APR_EOF || len == 0) { - break; - } - } - apr_socket_timeout_set(connection->socket, 1 * APR_USEC_PER_SEC); - - if (!strstr(buf,"CONNECTED")){ - stomp_disconnect(&connection); - strcpy(error, "authentication failed"); - *null_value = 1; - return NULL; - } - - ////// - - frame.command = "SEND"; - frame.headers = apr_hash_make(pool); - apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); - apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); - apr_hash_set(frame.headers, hdr2name, APR_HASH_KEY_STRING, hdr2val); - frame.body_length = -1; - frame.body = message; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend2 could not send SEND frame"); - *null_value = 1; - return NULL; - } - - frame.command = "DISCONNECT"; - frame.headers = NULL; - frame.body_length = -1; - frame.body = NULL; - stomp_write(connection, &frame, pool); // ignore errors - - stomp_disconnect(&connection); - - *res_length = 2; - strcpy(result, "OK"); - - return result; -} // stompsend2 - -#endif /* HAVE_DLOPEN */ +/* + lib_mysqludf_stomp - a library to send STOMP messages + Copyright 2005 LogicBlaze Inc. + Copyright (C) 2011 Dmitry Demianov aka barlone + + Authentication support on stompsend2 added by hugorosario + + this library use part of libstomp code + + web of STOMP project: http://stomp.codehaus.org/ + email: barlone@yandex.ru + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#if defined(_WIN32) || defined(_WIN64) || defined(__WIN32__) || defined(WIN32) +#define DLLEXP __declspec(dllexport) +#else +#define DLLEXP +#endif + +#ifdef STANDARD +/* STANDARD is defined, don't use any mysql functions */ +#include +#include + +#ifdef __WIN__ +typedef unsigned __int64 ulonglong; /* Microsofts 64 bit types */ +typedef __int64 longlong; +#else +typedef unsigned long long ulonglong; +typedef long long longlong; +#endif /*__WIN__*/ + +#else +#include +#include +#include +#endif +#include +#include +#include "apr.h" +#include "apr_strings.h" +#include "apr_general.h" +#include "apr_network_io.h" +#include "apr_hash.h" + +#ifdef HAVE_DLOPEN + +#define LIBVERSION "lib_mysqludf_stomp version 0.2.1" +#define BUFSIZE 4096 + +/****************************************************************************** +** function declarations +******************************************************************************/ +#ifdef __cplusplus +extern "C" { +#endif + +DLLEXP +my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message); +DLLEXP +void stompsend_deinit(UDF_INIT *initid); +DLLEXP +char *stompsend(UDF_INIT *initid, UDF_ARGS *args, char *result, + unsigned long *res_length, char *null_value, char *error); + +DLLEXP +my_bool stompsend1_init(UDF_INIT *initid, UDF_ARGS *args, char *message); +DLLEXP +void stompsend1_deinit(UDF_INIT *initid); +DLLEXP +char *stompsend1(UDF_INIT *initid, UDF_ARGS *args, char *result, + unsigned long *res_length, char *null_value, char *error); + +DLLEXP +my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message); +DLLEXP +void stompsend2_deinit(UDF_INIT *initid); +DLLEXP +char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, char *result, + unsigned long *res_length, char *null_value, char *error); + +typedef struct stomp_connection { + apr_socket_t *socket; + apr_sockaddr_t *local_sa; + char *local_ip; + apr_sockaddr_t *remote_sa; + char *remote_ip; +} stomp_connection; + +typedef struct stomp_frame { + char *command; + apr_hash_t *headers; + char *body; + int body_length; +} stomp_frame; + +static apr_pool_t *pool; + +#ifdef __cplusplus +} +#endif +// ------------------------------------------------------------------------------ + +// STOMP functions + +/****************************************************************************** + * + * Used to establish a connection + * + ********************************************************************************/ +APR_DECLARE(apr_status_t) stomp_connect(stomp_connection **connection_ref, const char *hostname, int port, apr_pool_t *pool) +{ +apr_status_t rc; +int socket_family; +stomp_connection *connection=NULL; + + // + // Allocate the connection and a memory pool for the connection. + // + connection = apr_pcalloc(pool, sizeof(stomp_connection)); + if( connection == NULL ) + return APR_ENOMEM; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } + + // Look up the remote address + rc = apr_sockaddr_info_get(&connection->remote_sa, hostname, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool); + CHECK_SUCCESS; + + // Create the socket. + socket_family = connection->remote_sa->sa.sin.sin_family; + rc = apr_socket_create(&connection->socket, socket_family, SOCK_STREAM, APR_PROTO_TCP, pool); + CHECK_SUCCESS; + + // Set socket options. + rc = apr_socket_opt_set(connection->socket, APR_SO_NONBLOCK, 1); + CHECK_SUCCESS; + rc = apr_socket_timeout_set(connection->socket, 1 * APR_USEC_PER_SEC); + CHECK_SUCCESS; + + // Try connect + rc = apr_socket_connect(connection->socket, connection->remote_sa); + CHECK_SUCCESS; + + // Get the Socket Info + rc = apr_socket_addr_get(&connection->remote_sa, APR_REMOTE, connection->socket); + CHECK_SUCCESS; + rc = apr_sockaddr_ip_get(&connection->remote_ip, connection->remote_sa); + CHECK_SUCCESS; + rc = apr_socket_addr_get(&connection->local_sa, APR_LOCAL, connection->socket); + CHECK_SUCCESS; + rc = apr_sockaddr_ip_get(&connection->local_ip, connection->local_sa); + CHECK_SUCCESS; + + +#undef CHECK_SUCCESS + + *connection_ref = connection; + return rc; +} // stomp_connect + +APR_DECLARE(apr_status_t) stomp_disconnect(stomp_connection **connection_ref) +{ +apr_status_t result, rc; +stomp_connection *connection = *connection_ref; + + if( connection_ref == NULL || *connection_ref==NULL ) + return APR_EGENERAL; + + result = APR_SUCCESS; + rc = apr_socket_shutdown(connection->socket, APR_SHUTDOWN_WRITE); + if( result!=APR_SUCCESS ) + result = rc; + + if( connection->socket != NULL ) { + rc = apr_socket_close(connection->socket); + if( result!=APR_SUCCESS ) + result = rc; + connection->socket=NULL; + } + + *connection_ref=NULL; + return rc; +} // stomp_disconnect + + +/******************************************************************************** + * + * Wrappers around the apr_socket_send and apr_socket_recv calls so that they + * read/write their buffers fully. + * + ********************************************************************************/ +APR_DECLARE(apr_status_t) stomp_write_buffer(stomp_connection *connection, const char *data, apr_size_t size) +{ +apr_size_t remaining = size; + + size=0; + while( remaining>0 ) { + apr_size_t length = remaining; + apr_status_t rc = apr_socket_send(connection->socket, data, &length); + data+=length; + remaining -= length; + // size += length; + if( rc != APR_SUCCESS ) + return rc; + + } + return APR_SUCCESS; +} // stomp_write_buffer + + +/******************************************************************************** + * + * Handles reading and writing stomp_frames to and from the connection + * + ********************************************************************************/ +APR_DECLARE(apr_status_t) stomp_write(stomp_connection *connection, stomp_frame *frame, apr_pool_t* pool) { +apr_status_t rc; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } + // Write the command. + rc = stomp_write_buffer(connection, frame->command, strlen(frame->command)); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, "\n", 1); + CHECK_SUCCESS; + + // Write the headers + if( frame->headers != NULL ) { + + apr_hash_index_t *i; + const void *key; + void *value; + for (i = apr_hash_first(NULL, frame->headers); i; i = apr_hash_next(i)) { + apr_hash_this(i, &key, NULL, &value); + + rc = stomp_write_buffer(connection, key, strlen(key)); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, ":", 1); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, value, strlen(value)); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, "\n", 1); + CHECK_SUCCESS; + } + + if(frame->body_length >= 0) { + apr_pool_t *length_pool; + char *length_string; + + apr_pool_create(&length_pool, pool); + rc = stomp_write_buffer(connection, "content-length:", 15); + CHECK_SUCCESS; + + length_string = apr_itoa(length_pool, frame->body_length); + rc = stomp_write_buffer(connection, length_string, strlen(length_string)); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, "\n", 1); + CHECK_SUCCESS; + + apr_pool_destroy(length_pool); + } + } + rc = stomp_write_buffer(connection, "\n", 1); + CHECK_SUCCESS; + + // Write the body. + if( frame->body != NULL ) { + int body_length = frame->body_length; + if(body_length < 0) + body_length = strlen(frame->body); + rc = stomp_write_buffer(connection, frame->body, body_length); + CHECK_SUCCESS; + } + rc = stomp_write_buffer(connection, "\0\n", 2); + CHECK_SUCCESS; + +#undef CHECK_SUCCESS + + return APR_SUCCESS; +} // stomp_write + + +my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message) +{ + + /* make sure user has provided exactly three string arguments */ + if (args->arg_count != 3 || (args->arg_type[0] != STRING_RESULT) + || (args->arg_type[1] != STRING_RESULT) + || (args->arg_type[2] != STRING_RESULT)){ + strcpy(message, "stompsend requires 3 string arguments"); + return 1; + } + + if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0)){ + strcpy(message, "stompsend arguments can not be empty"); + return 1; + } + + // init APR + if (apr_initialize() != APR_SUCCESS) { + strcpy(message, "stompsend could not initialize APR"); + return 2; + } + + if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + strcpy(message, "stompsend could not allocate APR pool"); + return 3; + } + + initid->maybe_null=0; + + return 0; +} + +/****************************************************************************** +** purpose: deallocate memory allocated by str_translate_init(); this func +** is called once for each query which invokes str_translate(), +** it is called after all of the calls to str_translate() are done +** receives: pointer to UDF_INIT struct (the same which was used by +** str_translate_init() and str_translate()) +** returns: nothing +******************************************************************************/ +void stompsend_deinit(UDF_INIT *initid __attribute__((unused))) +{ + apr_pool_destroy(pool); +} +/******************************************************************************/ + +char *stompsend(UDF_INIT *initid, UDF_ARGS *args, + char *result, unsigned long *res_length, + char *null_value, char *error) +{ +stomp_connection *connection; +stomp_frame frame; +char *host = args->args[0]; +char *topic = args->args[1]; +char *message = args->args[2]; + + if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { + strcpy(error, "stompsend could not connect to broker"); + *null_value = 1; + return NULL; + } + + frame.command = "CONNECT"; + frame.headers = apr_hash_make(pool); + frame.body = NULL; + frame.body_length = -1; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend could not send CONNECT frame"); + *null_value = 1; + return NULL; + } + + frame.command = "SEND"; + frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); + frame.body_length = -1; + frame.body = message; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend could not send SEND frame"); + *null_value = 1; + return NULL; + } + + frame.command = "DISCONNECT"; + frame.headers = NULL; + frame.body_length = -1; + frame.body = NULL; + stomp_write(connection, &frame, pool); // ignore errors + + stomp_disconnect(&connection); + + *res_length = 2; + strcpy(result, "OK"); + + return result; +} // stompsend + + +my_bool stompsend1_init(UDF_INIT *initid, UDF_ARGS *args, char *message) +{ + + /* make sure user has provided exactly three string arguments */ + if (args->arg_count != 5 || (args->arg_type[0] != STRING_RESULT) + || (args->arg_type[1] != STRING_RESULT) + || (args->arg_type[2] != STRING_RESULT) + || (args->arg_type[3] != STRING_RESULT) + || (args->arg_type[4] != STRING_RESULT)){ + strcpy(message, "stompsend1 requires 5 string arguments"); + return 1; + } + + if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0) || + (args->lengths[3] == 0) || (args->lengths[4] == 0)){ + strcpy(message, "stompsend1 arguments can not be empty"); + return 1; + } + + // init APR + if (apr_initialize() != APR_SUCCESS) { + strcpy(message, "stompsend1 could not initialize APR"); + return 2; + } + + if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + strcpy(message, "stompsend1 could not allocate APR pool"); + return 3; + } + + initid->maybe_null=0; + + return 0; +} // stompsend1_init + +/****************************************************************************** +** purpose: deallocate memory allocated by str_translate_init(); this func +** is called once for each query which invokes str_translate(), +** it is called after all of the calls to str_translate() are done +** receives: pointer to UDF_INIT struct (the same which was used by +** str_translate_init() and str_translate()) +** returns: nothing +******************************************************************************/ +void stompsend1_deinit(UDF_INIT *initid __attribute__((unused))) +{ + apr_pool_destroy(pool); +} // stompsend1_deinit +/******************************************************************************/ + +char *stompsend1(UDF_INIT *initid, UDF_ARGS *args, + char *result, unsigned long *res_length, + char *null_value, char *error) +{ +stomp_connection *connection; +stomp_frame frame; +char *host = args->args[0]; +char *topic = args->args[1]; +char *message = args->args[2]; +char *hdr1name = args->args[3]; +char *hdr1val = args->args[4]; + + if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { + strcpy(error, "stompsend1 could not connect to broker"); + *null_value = 1; + return NULL; + } + + frame.command = "CONNECT"; + frame.headers = apr_hash_make(pool); + frame.body = NULL; + frame.body_length = -1; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend1 could not send CONNECT frame"); + *null_value = 1; + return NULL; + } + + frame.command = "SEND"; + frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); + apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); + frame.body_length = -1; + frame.body = message; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend1 could not send SEND frame"); + *null_value = 1; + return NULL; + } + + frame.command = "DISCONNECT"; + frame.headers = NULL; + frame.body_length = -1; + frame.body = NULL; + stomp_write(connection, &frame, pool); // ignore errors + + stomp_disconnect(&connection); + + *res_length = 2; + strcpy(result, "OK"); + + return result; +} // stompsend1 + +my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message) +{ + + /* make sure user has provided exactly three string arguments */ + if (args->arg_count != 7 || (args->arg_type[0] != STRING_RESULT) + || (args->arg_type[1] != STRING_RESULT) + || (args->arg_type[2] != STRING_RESULT) + || (args->arg_type[3] != STRING_RESULT) + || (args->arg_type[4] != STRING_RESULT) + || (args->arg_type[5] != STRING_RESULT) + || (args->arg_type[6] != STRING_RESULT)){ + strcpy(message, "stompsend2 requires 7 string arguments"); + return 1; + } + + if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0) || + (args->lengths[3] == 0) || (args->lengths[4] == 0) || + (args->lengths[5] == 0) || (args->lengths[6] == 0)){ + strcpy(message, "stompsend2 arguments can not be empty"); + return 1; + } + + // init APR + if (apr_initialize() != APR_SUCCESS) { + strcpy(message, "stompsend2 could not initialize APR"); + return 2; + } + + if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + strcpy(message, "stompsend2 could not allocate APR pool"); + return 3; + } + + initid->maybe_null=0; + + return 0; +} // stompsend1_init + +/****************************************************************************** +** purpose: deallocate memory allocated by str_translate_init(); this func +** is called once for each query which invokes str_translate(), +** it is called after all of the calls to str_translate() are done +** receives: pointer to UDF_INIT struct (the same which was used by +** str_translate_init() and str_translate()) +** returns: nothing +******************************************************************************/ +void stompsend2_deinit(UDF_INIT *initid __attribute__((unused))) +{ + apr_pool_destroy(pool); +} // stompsend2_deinit +/******************************************************************************/ + + + +APR_DECLARE(apr_status_t) stomp_read(stomp_connection *connection, char *out){ + apr_status_t rc; + char buf[BUFSIZE]; + apr_interval_time_t oldTimeout = 1 * APR_USEC_PER_SEC; + apr_socket_timeout_get(connection->socket, &oldTimeout); + apr_socket_timeout_set(connection->socket, 15000); + while (1) { + apr_size_t len = sizeof(buf); + rc = apr_socket_recv(connection->socket, buf, &len); + if (rc == APR_EOF || len == 0) { + rc = APR_SUCCESS; + strcpy(out, buf); + break; + } + } + apr_socket_timeout_set(connection->socket, oldTimeout); + return rc; +} + + +char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, + char *result, unsigned long *res_length, + char *null_value, char *error) +{ +stomp_connection *connection; +stomp_frame frame; +char *host = args->args[0]; +char *topic = args->args[1]; +char *message = args->args[2]; +char *hdr1name = args->args[3]; +char *hdr1val = args->args[4]; +char *hdr2name = args->args[5]; +char *hdr2val = args->args[6]; + + if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { + strcpy(error, "stompsend2 could not connect to broker"); + *null_value = 1; + return NULL; + } + + frame.command = "CONNECT"; + frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); + apr_hash_set(frame.headers, hdr2name, APR_HASH_KEY_STRING, hdr2val); + frame.body = NULL; + frame.body_length = -1; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend2 could not send CONNECT frame"); + *null_value = 1; + return NULL; + } + + // validate CONNECT response + + char buf[BUFSIZE]; + if ((stomp_read(connection, buf) == APR_SUCCESS) && (!strstr(buf,"CONNECTED"))){ + stomp_disconnect(&connection); + //strcpy(result, buf); + //*res_length = strlen(result); + //return result; + strcpy(error, "did not receive CONNECTED response"); + *null_value = 1; + return NULL; + } + + //CONNECT frame was successful, carry on with sending the message + + frame.command = "SEND"; + frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); + apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); + apr_hash_set(frame.headers, hdr2name, APR_HASH_KEY_STRING, hdr2val); + frame.body_length = -1; + frame.body = message; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend2 could not send SEND frame"); + *null_value = 1; + return NULL; + } + + frame.command = "DISCONNECT"; + frame.headers = NULL; + frame.body_length = -1; + frame.body = NULL; + stomp_write(connection, &frame, pool); // ignore errors + + stomp_disconnect(&connection); + + *res_length = 2; + strcpy(result, "OK"); + + return result; +} // stompsend2 + +#endif /* HAVE_DLOPEN */ From b237543a2102717e71f7c1b3759fdb5e64759897 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Sat, 10 Sep 2016 03:08:26 +0100 Subject: [PATCH 20/26] Reverted Makefile to use /usr/lib/x86_64-linux-gnu Previous makefile was looking for APR libraries in i386-linux-gnu folder. Reverting back to x86_64-linux-gnu because its the standard system nowadays. --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index f816516..dee5239 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ MYSQL_DEV_DIR=/usr/include/mysql APR_DEV_DIR=/usr/include/apr-1.0/ MYSQL_LIB_DIR=/usr/lib/mysql/plugin -APR_LIB_DIR=/usr/lib/i386-linux-gnu +APR_LIB_DIR=/usr/lib/x86_64-linux-gnu install: gcc -Wall -O2 -I$(APR_DEV_DIR) -I$(MYSQL_DEV_DIR) lib_mysqludf_stomp.c /$(APR_LIB_DIR)/libapr-1.so -shared -o $(MYSQL_LIB_DIR)/lib_mysqludf_stomp.so -fPIC From 98d2a2e06b00a9070524458d9f4739df3e91081f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Sat, 10 Sep 2016 03:13:39 +0100 Subject: [PATCH 21/26] Removed warning Since this new version validates authentication and responds NULL if the credentials are invalid, this warning is no longer required. --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index dba3208..efdb95e 100644 --- a/README.md +++ b/README.md @@ -50,8 +50,7 @@ This function was modified in this fork to allow authentication by sending the h `SELECT stompsend2("127.0.0.1","Welcome", "Hello broker", "login","guest","passcode","mypass");` -#### Beware, if the credentials are invalid you will still receive an "OK" from the UDF, so you should NOT rely on that to verify the message was sent. - +If the credentials are invalid, you should receive a NULL response. ## Credits From 3ce2e18afde2397c860879d2f564d7f0346e0157 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Tue, 13 Sep 2016 16:28:43 +0100 Subject: [PATCH 22/26] Add gcc on dependency package installation --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index efdb95e..3b95f14 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Supports authentication using the stompsend2 function (see details below). ### Install Dependencies In order do compile the plugin, you need to install Apache Portable Runtime (APR) and MySQL client development packages. -`sudo apt-get install libapr1 libapr1-dev libmysqlclient-dev` +`sudo apt-get install gcc libapr1 libapr1-dev libmysqlclient-dev` ### Compile & Install To install the UDF, just run the provided install script which will compile and install the library. From a7bc881f1e6f6e484676eb8a4b86fd6482d8cd26 Mon Sep 17 00:00:00 2001 From: hugorosario Date: Thu, 15 Sep 2016 14:02:39 +0100 Subject: [PATCH 23/26] Disabled Timeouts --- lib_mysqludf_stomp.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index 53028e1..e0345e2 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -550,9 +550,9 @@ void stompsend2_deinit(UDF_INIT *initid __attribute__((unused))) APR_DECLARE(apr_status_t) stomp_read(stomp_connection *connection, char *out){ apr_status_t rc; char buf[BUFSIZE]; - apr_interval_time_t oldTimeout = 1 * APR_USEC_PER_SEC; - apr_socket_timeout_get(connection->socket, &oldTimeout); - apr_socket_timeout_set(connection->socket, 15000); + //apr_interval_time_t oldTimeout = 1 * APR_USEC_PER_SEC; + //apr_socket_timeout_get(connection->socket, &oldTimeout); + //apr_socket_timeout_set(connection->socket, 15000); while (1) { apr_size_t len = sizeof(buf); rc = apr_socket_recv(connection->socket, buf, &len); @@ -562,7 +562,7 @@ APR_DECLARE(apr_status_t) stomp_read(stomp_connection *connection, char *out){ break; } } - apr_socket_timeout_set(connection->socket, oldTimeout); + //apr_socket_timeout_set(connection->socket, oldTimeout); return rc; } @@ -604,13 +604,15 @@ char *hdr2val = args->args[6]; char buf[BUFSIZE]; if ((stomp_read(connection, buf) == APR_SUCCESS) && (!strstr(buf,"CONNECTED"))){ stomp_disconnect(&connection); - //strcpy(result, buf); - //*res_length = strlen(result); - //return result; +// strcpy(result, buf); +// *res_length = strlen(result); +// return result; strcpy(error, "did not receive CONNECTED response"); *null_value = 1; + strcpy(buf,""); return NULL; } + strcpy(buf,""); //CONNECT frame was successful, carry on with sending the message From c9e91ad77bac2ec2843400890444303401dbae3c Mon Sep 17 00:00:00 2001 From: hugorosario Date: Fri, 16 Sep 2016 01:50:55 +0100 Subject: [PATCH 24/26] Implemented the stomp_read code from libstomp sources and replaced the previous code. Some stuff removed from original source. --- Makefile | 3 +- lib_mysqludf_stomp.c | 291 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 258 insertions(+), 36 deletions(-) diff --git a/Makefile b/Makefile index dee5239..dab5030 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,8 @@ MYSQL_DEV_DIR=/usr/include/mysql APR_DEV_DIR=/usr/include/apr-1.0/ MYSQL_LIB_DIR=/usr/lib/mysql/plugin APR_LIB_DIR=/usr/lib/x86_64-linux-gnu +#APR_LIB_DIR=/usr/lib/i386-linux-gnu install: - gcc -Wall -O2 -I$(APR_DEV_DIR) -I$(MYSQL_DEV_DIR) lib_mysqludf_stomp.c /$(APR_LIB_DIR)/libapr-1.so -shared -o $(MYSQL_LIB_DIR)/lib_mysqludf_stomp.so -fPIC + gcc -Wall -O2 -I$(APR_DEV_DIR) -I$(MYSQL_DEV_DIR) lib_mysqludf_stomp.c $(APR_LIB_DIR)/libapr-1.so -shared -o $(MYSQL_LIB_DIR)/lib_mysqludf_stomp.so -fPIC diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index e0345e2..5fcb34c 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -106,14 +106,29 @@ typedef struct stomp_frame { int body_length; } stomp_frame; +typedef struct data_block_list { + char data[1024]; + struct data_block_list *next; +} data_block_list; + static apr_pool_t *pool; #ifdef __cplusplus } #endif + +// ------------------------------------------------------------------------------ +// Helper functions // ------------------------------------------------------------------------------ +int prefix(const char *pre, const char *str){ + return strncmp(pre, str, strlen(pre)); +} + +// ------------------------------------------------------------------------------ // STOMP functions +// ------------------------------------------------------------------------------ + /****************************************************************************** * @@ -220,6 +235,153 @@ apr_size_t remaining = size; return APR_SUCCESS; } // stomp_write_buffer +APR_DECLARE(apr_status_t) stomp_read_line(stomp_connection *connection, char **data, int* length, apr_pool_t *pool) +{ + apr_pool_t *tpool; + apr_status_t rc; + data_block_list *head, *tail; + apr_size_t i=0; + apr_size_t bytesRead=0; + char *p; + + rc = apr_pool_create(&tpool, pool); + if( rc != APR_SUCCESS ) { + return rc; + } + + head = tail = apr_pcalloc(tpool, sizeof(data_block_list)); + if( head == NULL ) + return APR_ENOMEM; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_pool_destroy(tpool); return rc; } + + while( 1 ) { + + apr_size_t length = 1; + apr_status_t rc = apr_socket_recv(connection->socket, tail->data+i, &length); + CHECK_SUCCESS; + + if( length==1 ) { + i++; + bytesRead++; + + // Keep reading bytes till end of line + if( tail->data[i-1]=='\n') { + // Null terminate the string instead of having the newline + tail->data[i-1] = 0; + break; + } else if( tail->data[i-1]==0 ) { + // Encountered 0 before end of line + apr_pool_destroy(tpool); + return APR_EGENERAL; + } + + // Do we need to allocate a new block? + if( i >= sizeof( tail->data) ) { + tail->next = apr_pcalloc(tpool, sizeof(data_block_list)); + if( tail->next == NULL ) { + apr_pool_destroy(tpool); + return APR_ENOMEM; + } + tail=tail->next; + i=0; + } + } + } + +#undef CHECK_SUCCESS + // Now we have the whole frame and know how big it is. Allocate it's buffer + *data = apr_pcalloc(pool, bytesRead); + p = *data; + if( p==NULL ) { + apr_pool_destroy(tpool); + return APR_ENOMEM; + } + + // Copy the frame over to the new buffer. + *length = bytesRead - 1; + for( ;head != NULL; head = head->next ) { + int len = bytesRead > sizeof(head->data) ? sizeof(head->data) : bytesRead; + memcpy(p,head->data,len); + p+=len; + bytesRead-=len; + } + + apr_pool_destroy(tpool); + return APR_SUCCESS; +} + +APR_DECLARE(apr_status_t) stomp_read_buffer(stomp_connection *connection, char **data, apr_pool_t *pool) +{ + apr_pool_t *tpool; + apr_status_t rc; + data_block_list *head, *tail; + apr_size_t i=0; + apr_size_t bytesRead=0; + char *p; + + rc = apr_pool_create(&tpool, pool); + if( rc != APR_SUCCESS ) { + return rc; + } + + head = tail = apr_pcalloc(tpool, sizeof(data_block_list)); + if( head == NULL ) + return APR_ENOMEM; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_pool_destroy(tpool); return rc; } + + // Keep reading bytes till end of frame is encountered. + while( 1 ) { + + apr_size_t length = 1; + apr_status_t rc = apr_socket_recv(connection->socket, tail->data+i, &length); + CHECK_SUCCESS; + + if( length==1 ) { + i++; + bytesRead++; + + // Keep reading bytes till end of frame + + if( tail->data[i-1]==0 ) { + break; + } + + // Do we need to allocate a new block? + if( i >= sizeof( tail->data) ) { + tail->next = apr_pcalloc(tpool, sizeof(data_block_list)); + if( tail->next == NULL ) { + apr_pool_destroy(tpool); + return APR_ENOMEM; + } + tail=tail->next; + i=0; + } + } + } +#undef CHECK_SUCCESS + + // Now we have the whole frame and know how big it is. Allocate it's buffer + *data = apr_pcalloc(pool, bytesRead); + p = *data; + if( p==NULL ) { + apr_pool_destroy(tpool); + return APR_ENOMEM; + } + + // Copy the frame over to the new buffer. + for( ;head != NULL; head = head->next ) { + int len = bytesRead > sizeof(head->data) ? sizeof(head->data) : bytesRead; + memcpy(p,head->data,len); + p+=len; + bytesRead-=len; + } + + apr_pool_destroy(tpool); + return APR_SUCCESS; +} + /******************************************************************************** * @@ -292,6 +454,76 @@ apr_status_t rc; } // stomp_write +APR_DECLARE(apr_status_t) stomp_read(stomp_connection *connection, stomp_frame **frame, apr_pool_t *pool) { + + apr_status_t rc; + stomp_frame *f; + + f = apr_pcalloc(pool, sizeof(stomp_frame)); + if( f == NULL ) + return APR_ENOMEM; + + f->headers = apr_hash_make(pool); + if( f->headers == NULL ) + return APR_ENOMEM; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } + + // Parse the frame out. + char *p; + int length; + + // Parse the command. + rc = stomp_read_line(connection, &p, &length, pool); + CHECK_SUCCESS; + + f->command = p; + + + // Start parsing the headers. + while( 1 ) { + rc = stomp_read_line(connection, &p, &length, pool); + CHECK_SUCCESS; + + // Done with headers + if(length == 0) + break; + + { + // Parse the header line. + char *p2; + void *key; + void *value; + + p2 = strstr(p,":"); + if( p2 == NULL ) { + // Expected at 1 : to delimit the key from the value. + return APR_EGENERAL; + } + + // Null terminate the key + *p2=0; + key = p; + + // The rest if the value. + value = p2+1; + + // Insert key/value into hash table. + apr_hash_set(f->headers, key, APR_HASH_KEY_STRING, value); + } + } + + + // The remainder of the buffer (including the \n at the end) is the body) + rc = stomp_read_buffer(connection, &f->body, pool); + CHECK_SUCCESS; + + +#undef CHECK_SUCCESS + *frame = f; + return APR_SUCCESS; +} + my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message) { @@ -324,6 +556,7 @@ my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message) return 0; } + /****************************************************************************** ** purpose: deallocate memory allocated by str_translate_init(); this func ** is called once for each query which invokes str_translate(), @@ -531,6 +764,8 @@ my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message) return 0; } // stompsend1_init + + /****************************************************************************** ** purpose: deallocate memory allocated by str_translate_init(); this func ** is called once for each query which invokes str_translate(), @@ -545,34 +780,13 @@ void stompsend2_deinit(UDF_INIT *initid __attribute__((unused))) } // stompsend2_deinit /******************************************************************************/ - - -APR_DECLARE(apr_status_t) stomp_read(stomp_connection *connection, char *out){ - apr_status_t rc; - char buf[BUFSIZE]; - //apr_interval_time_t oldTimeout = 1 * APR_USEC_PER_SEC; - //apr_socket_timeout_get(connection->socket, &oldTimeout); - //apr_socket_timeout_set(connection->socket, 15000); - while (1) { - apr_size_t len = sizeof(buf); - rc = apr_socket_recv(connection->socket, buf, &len); - if (rc == APR_EOF || len == 0) { - rc = APR_SUCCESS; - strcpy(out, buf); - break; - } - } - //apr_socket_timeout_set(connection->socket, oldTimeout); - return rc; -} - - char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *res_length, char *null_value, char *error) { stomp_connection *connection; stomp_frame frame; +stomp_frame *readframe; char *host = args->args[0]; char *topic = args->args[1]; char *message = args->args[2]; @@ -599,21 +813,28 @@ char *hdr2val = args->args[6]; return NULL; } - // validate CONNECT response - - char buf[BUFSIZE]; - if ((stomp_read(connection, buf) == APR_SUCCESS) && (!strstr(buf,"CONNECTED"))){ - stomp_disconnect(&connection); -// strcpy(result, buf); -// *res_length = strlen(result); -// return result; - strcpy(error, "did not receive CONNECTED response"); + // validate CONNECT response frame + + if (stomp_read(connection, &readframe, pool) != APR_SUCCESS) { + strcpy(error, "stompsend2 could not receive response"); *null_value = 1; - strcpy(buf,""); - return NULL; + return NULL; } - strcpy(buf,""); - + + if (prefix("ERROR", readframe->command) == 0) { + if (strlen(readframe->body) > 0){ + char * line = readframe->body; + line[strlen(line) - 1] = '\0'; + strcpy(result, line); + *res_length = strlen(result); + return result; + }else{ + strcpy(error, "stompsend2 did not receive a CONNECTED response"); + *null_value = 1; + return NULL; + } + } + //CONNECT frame was successful, carry on with sending the message frame.command = "SEND"; From 2dff5a254fa1112133cdb65d848991cb954b0ee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20Ros=C3=A1rio?= Date: Fri, 16 Sep 2016 02:00:51 +0100 Subject: [PATCH 25/26] Update info about the response of failed authentications. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b95f14..6995fc0 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ This function was modified in this fork to allow authentication by sending the h `SELECT stompsend2("127.0.0.1","Welcome", "Hello broker", "login","guest","passcode","mypass");` -If the credentials are invalid, you should receive a NULL response. +If the credentials are invalid, you should receive the body of the message sent by the broker (ex: "Access refused for client 'guest'."). ## Credits From af6a3e6f8bb2e7b79a5ab69082a03a2810c56129 Mon Sep 17 00:00:00 2001 From: hugorosario Date: Fri, 18 Nov 2016 10:54:18 +0000 Subject: [PATCH 26/26] Removed authentication validation --- lib_mysqludf_stomp.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index 5fcb34c..61c7657 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -786,7 +786,7 @@ char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, { stomp_connection *connection; stomp_frame frame; -stomp_frame *readframe; +// stomp_frame *readframe; char *host = args->args[0]; char *topic = args->args[1]; char *message = args->args[2]; @@ -814,7 +814,7 @@ char *hdr2val = args->args[6]; } // validate CONNECT response frame - +/* if (stomp_read(connection, &readframe, pool) != APR_SUCCESS) { strcpy(error, "stompsend2 could not receive response"); *null_value = 1; @@ -834,7 +834,7 @@ char *hdr2val = args->args[6]; return NULL; } } - +*/ //CONNECT frame was successful, carry on with sending the message frame.command = "SEND";