REBOL [ Title: "Simple general purpose messaging engine" Purpose: { Provide a messaging engine for applications. } Author: "Gabriele Santilli" EMail: giesse@rebol.it File: %messaging.r License: { Copyright (c) 2004, Gabriele Santilli All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * The name of Gabriele Santilli may not be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. } Date: 23-Feb-2005 Version: 1.27.0 ; majorv.minorv.status ; status: 0: unfinished; 1: testing; 2: stable History: [ 21-Dec-2004 1.1.0 "History start" 21-Dec-2004 1.2.0 "Design skeleton" 22-Dec-2004 1.3.0 "send-message (first implementation)" 22-Dec-2004 1.4.0 "Implementing listen-message" 23-Dec-2004 1.5.0 "Implementing listen-message" 23-Dec-2004 1.6.0 "Testing" 23-Dec-2004 1.7.0 "Testing" 24-Dec-2004 1.8.0 "Now based on encoders, decoders and handlers" 4-Jan-2005 1.9.0 "Implemented HTTP mini server, testing" 4-Jan-2005 1.10.0 "Fixed some bugs" 5-Jan-2005 1.11.0 "Improved level 1 [en|de]coding, testing" 7-Jan-2005 1.12.0 "Added documentation" 11-Jan-2005 1.13.0 "Added /BINARY to OPEN" 15-Jan-2005 1.14.0 "Testing" 15-Jan-2005 1.15.0 "Added QAD PARSE-HEADER patch, fixed some bugs, testing" 18-Jan-2005 1.16.0 "Removed some debug prints" 18-Jan-2005 1.17.0 "Handlers now get the port too as argument" 20-Jan-2005 1.18.0 "Rewrite with async handlers and extensible functions for encoders and decoders" 20-Jan-2005 1.19.0 "Fixed non-async UDP send-message" 20-Jan-2005 1.20.0 "...really" 20-Jan-2005 1.21.0 "Fixed a number of bugs" 24-Jan-2005 1.22.0 "Fixed a bug" 26-Jan-2005 1.23.0 "Added timeout for HTTP connections too; using DO-AFTER 0 in awake functions" 26-Jan-2005 1.24.0 "Fixed a bug in http-awake" 28-Jan-2005 1.25.0 "Added debug prints for Chord" 31-Jan-2005 1.26.0 "DO-AFTER for listen awakes" 23-Feb-2005 1.27.0 "Still testing" ] ] debug: func [ condition output ] [ if condition [append-log 10 reform output] ] ; comment the following line if you are using autodoc.r ;#do [document: func [text] [none] none] #do [document { ===Simple general purpose messaging engine (messaging.r) ---Encoders, decoders and handlers Three functions are used to process messages: ENCODE-MESSAGE, DECODE-MESSAGE and HANDLE-MESSAGE. When sending a message, it is processed by ENCODE-MESSAGE; the result is then sent to the destination; there, the coming message is processed by DECODE-MESSAGE, and then by HANDLE-MESSAGE. This function calls the functions in the HANDLERS block, in order. The first handler returning a non-none value will be considered as the message handler, and its result will be sent back. This result is thus processed by ENCODE-MESSAGE sent as a response; so the source gets this as a response and processes it DECODE-MESSAGE to obtain the final result of the request. To add an handler to the list just insert it to the HANDLERS block. It must be a function accepting three arguments: the port, the message, and a callback function. The callback function must be called with the result or NONE. It is possible to extend the ENCODE-MESSAGE and DECODE-MESSAGE functions using the EXTEND function since they are defined as EFUNCs. By default messages are just molded and compressed for encoding, and decompressed and loaded for decoding. }] ; DEPENDENCY: utility.r handle-message': closure [handlers port message callback] [ if tail? handlers [callback none exit] do first handlers port :message func [result] [ either :result [ callback :result ] [ handle-message' next handlers port :message :callback ] ] ] handle-message: func [port message callback] [ debug true ["handle-message" mold :message] handle-message' handlers port :message :callback ] handlers: [ ] ; DEPENDENCY: utility.r encode-message: efunc [port message] [ compress mold/all :message ] decode-message: efunc [port message] [ attempt [first load/all decompress to binary! message] ] #do [document { ---SEND-MESSAGE The SEND-MESSAGE function takes the following arguments: :port [port! url!] - Message destination. You can use an UDP:// or HTTP:// URL (when using the /ASYNC refinement, you'll need something like AHTTP:// instead of HTTP://; the implementation assumes AHTTP://). It is also possible to pass a port directly (opened or not); remember that if it is not an UDP port, then HTTP alike semantics is assumed, and when /ASYNC, AHTTP:// alike semantics is assumed. :message - Message to send as request. It can be any REBOL value. :/async callback [function!] - Do not wait for the result, return immediately instead. The CALLBACK function is called when the result is available. It must accept one argument, the result. :/timeout maxtime [time! number!] - Only valid if the port is an UDP port. Time out if there's no answer after maxtime (by default, maxtime is 3 seconds). Unless /ASYNC is used, the result of the function is the REBOL value sent as response by the detination. The result may be NONE if there's a problem sending the message or a timeout. If there's an error at the destination, the result will be a block starting with the word ERROR. The second element of this block will be a word identifying the error (e.g. NO-HANDLER if there was no handler for the message); the optional third element is usually a string with an english description of the error, with other values following if useful. Note that when using UDP, the message size may not exceed about 64 KB. The function will throw an error if it does. Use HTTP for larger messages. }] send-message: func [ "Send a value to a remote application and obtain a response" [catch] port [port! url!] "Communication port or URL (HTTP or UDP)" message "Message to send" /async "Do not wait for the result, return immediately instead" callback [function!] "Function to call when the result is available" /timeout maxtime [time! number!] "Time out if there's no answer after maxtime (default: 3 seconds for UDP, 5 for HTTP)" /local lclose result timeout-action id ] [ id: random 10000 debug true [id "send-message" either port? port [rejoin [port/scheme "://" port/host ":" port/port-id]] [mold port] mold :message] if not port? port [port: make port! port] message: encode-message port :message either port/scheme = 'udp [ ; PROBLEM: does this really work? if port/state/flags = 0 [ port: throw-on-error [open/binary/direct/no-wait port] lclose: :close ] if 64512 < length? message [ throw make error! "The message is too long for the UDP transport" ] insert port message either async [ localize [timeout-action lclose port callback] [ ; DEPENDENCY: timers.r do-after any [maxtime 3] timeout-action: copy [ if-error [lclose port] [print "Hmmm. :/"] wait-stop port callback none ] port/awake: func [port /local message] [ ;print "awake" message: make binary! 65538 if 0 < read-io port message 65536 [ message: decode-message port message lclose port wait-stop port remove-timer timeout-action localize [message] [do-after 0 [callback :message]] ] false ] ] debug 10 < length? system/ports/wait-list ["[1] Wait list too long!!!" length? system/ports/wait-list] ; DEPENDENCY: async-protocol.r wait-start port none ] [ result: make binary! 65538 ; READ-IO clears the port's awake status result: either 0 < read-io port result 65536 [ decode-message port result ] [ either wait reduce [port any [maxtime 3]] [ decode-message port copy port ] [ none ] ] ; local CLOSE, will close only if we opened it lclose port :result ] ] [ ; everything else is assumed to have http://-alike semantics ; for async, you need ahttp:// or similar in this version. ; a version for the new core will be done later on. ; PROBLEM: cannot pass an already opened port (we need a handler supporting keep-alive) port: throw-on-error [open/direct/custom/binary/no-wait port reduce ['POST message]] either async [ localize [port callback timeout-action id] [ ; assumes async://-style awake port/awake: func [port event [word! error!]] [ debug true "ahttp awake" if error? :event [ debug true ["!> send-message" id "error"] do-after 0 [callback none] remove-timer timeout-action attempt [close port] return false ] if event = 'close [ use [message] [ message: decode-message port copy port debug true ["!> send-message" id "result:" mold :message] do-after 0 [callback :message] ] remove-timer timeout-action attempt [close port] ] false ] do-after any [maxtime 5] timeout-action: copy [ debug true ["!> send-message" id "timeout"] callback none callback: none attempt [close port] ] ] none ] [ result: either wait reduce [port any [maxtime 5]] [ decode-message port copy port ] [ none ] attempt [close port] :result ] ] ] #do [document { ---LISTEN-MESSAGES The LISTEN-MESSAGES function implements a mini HTTP server to handle messages coming via HTTP. This server is very basic and the implementation will not be documented here as it is subject to improvements. }] ; QAD parse-header patch header-rules: context [ template: none w: none set-word: none head-list: none spot: none full-line: none line: none invalid: [] spaces: make bitset! #{ 0002000001000000000000000000000000000000000000000000000000000000 } leader: make bitset! #{ 0026000001000000000000000000000000000000000000000000000000000000 } field-chars: make bitset! #{ 000000000060FF03FEFFFF87FEFFFF0700000000000000000000000000000000 } nlchars: charset "^M^J" non-nlchars: complement nlchars new-line: [copy line some non-nlchars [newline | CRLF | "^M"]] field-value: [ (full-line: make string! 32) new-line (all [line insert tail full-line trim line]) any [ some spaces new-line (insert insert tail full-line " " trim any [line ""]) ] ] message-header: [ copy w some field-chars ":" field-value ( w: to-word w set-word: to-set-word w either all [template in template w block? template/:w] [ any [ spot: select head-list set-word insert/only insert tail head-list set-word spot: copy [] ] ] [ spot: insert tail head-list set-word ] insert tail spot full-line ) ] content: [ full-line: (insert insert tail head-list to-set-word "content" full-line) to end ] message: [ (head-list: make block! 10) any leader some [ message-header | [newline | CRLF | "^M"] content break | copy line some non-nlchars [newline | CRLF | "^M"] (insert tail invalid line) ] ] system/words/parse-header: parse-head: func [ {Returns a header object with header fields and their values} parent [object! none!] "Default header object" data [any-string!] "String to parse" /multiple "Obsolete. Here for compatibility only." ][ clear invalid template: parent any [ parse/all data message net-error "Headers not correctly parsed" ] make either parent [parent] [object!] head-list ] ] request-header: context [ Content-Length: none ] chars: complement charset " ^M^J" send-result: func [port data] [ debug true "send-result" ;attempt [ port/awake: func [port event [word! error!]] [ if error? :event [ debug true "send-result insert error" attempt [close port] return false ] if event = 'write [ debug true "send-result insert finished, closing port" if-error [close port] [debug true "Error closing port..."] ] false ] debug true "inserting result..." insert port rejoin [ "HTTP/1.0 200 Ok^M^J" "Content-Length: " length? data CRLF "Content-Type: application/octet-stream^M^J" CRLF to string! data ] ;] ;wait-stop port ] http-cont-awake: func [port event [word! error!] /local data] [ debug true "http-cont-awake" if error? :event [ attempt [close port] return false ] if event = 'read [ either all [data: attempt [copy port] not empty? data] [ debug true ["[1] accessing port/user-data of:" port/scheme port/host port/port-id] append port/user-data/content data if port/user-data/Content-Length = length? port/user-data/content [ port/awake: func [port event [word! error!]] [false] localize [port] [ process-message port port/user-data/content func [result] [ send-result port any [result encode-message port [ERROR NO-HANDLER "I cannot handle that request"]] ] ] ] ] [ attempt [close port] ;wait-stop port ] ] false ] http-awake: closure [port event [word! error!] /local data] [ debug true "http-awake" if error? :event [ attempt [close port] return false ] if event = 'read [ either all [data: attempt [copy port] not empty? data] [ debug true ["http-awake got data:" mold data] debug true ["[2] accessing port/user-data of:" port/scheme port/host port/port-id] either port/user-data [ either object? port/user-data [ port/awake: func [port event [word! error!]] [false] either port/user-data/Content-Length = length? port/user-data/content [ ; no more input needed process-message port port/user-data/content func [result] [ send-result port any [result encode-message port [ERROR NO-HANDLER "I cannot handle that request"]] ] ] [ port/awake: :http-cont-awake ] ] [ append port/user-data data ] ] [ port/user-data: data ] if parse/all port/user-data [ "GET" some " " some chars some " " "HTTP/1." ["0" | "1"] ["^M^J^M^J" end | "^M^J" thru "^M^J^M^J" end] ] [ insert port "HTTP/1.0 404 Not found^M^J^M^J" port/awake: func [port event [word! error!]] [if event = 'write [attempt [close port]] false] return false ] if parse/all port/user-data [ "POST" some " " some chars some " " "HTTP/1." ["0" | "1"] "^M^J" data: thru "^M^J^M^J" to end ] [ debug true ["[2] Changing user-data of:" port/scheme port/host port/port-id] port/user-data: parse-header request-header data port/awake: func [port event [word! error!]] [false] either all [ port/user-data/Content-Length attempt [port/user-data/Content-Length: to integer! port/user-data/Content-Length] port/user-data/Content-Length > 0 ] [ either port/user-data/Content-Length = length? port/user-data/content [ ; no more input needed process-message port port/user-data/content func [result] [ send-result port any [result encode-message port [ERROR NO-HANDLER "I cannot handle that request"]] ] ] [ port/awake: :http-cont-awake ] ] [ insert port "HTTP/1.0 400 Bad Request^M^J^M^J" port/awake: func [port event [word! error!]] [if event = 'write [attempt [close port]] false] ] ] ] [ debug true "closing port" attempt [close port] ;wait-stop port ] ] false ] handle-new-connection: func [port] [ debug true "handle-new-connection" port: open make port! [ scheme: 'async sub-port: port remote-ip: port/remote-ip remote-port: port/remote-port ] port/awake: :http-awake debug true ["Changing user-data of:" port/scheme port/host port/port-id] port/user-data: none debug 10 < length? system/ports/wait-list ["[2] Wait list too long!!!" length? system/ports/wait-list] ] #do [document { The LISTEN-MESSAGES function takes the following arguments: :port [port! url!] - Port or URL to listen on. It can be an (already opened or not) listen port, or a listen URL (e.g. udp://:10000). Only UDP and TCP are supported. For TCP, even a normal port is supported, and it is treated as a new connection obtained from a listen port. (This can be used to make it work as a server on a client connection, for e.g. when under NAT etc. HTTP keep-alive support would be needed for maximum performance in this case, but this is not yet implemented.) If the port is TCP, requests are assumed to follow the HTTP protocol. :/async - Do not wait, return immediately instead. When this refinement is used, everything is set up for message listening. The user can then WAIT [] afterwards to enable message handling as usual. }] listen-messages: func [ "Listen for messages and handle them" ;[catch] port [port! url!] "Port or URL to listen on" /async "Do not wait here, but set things up and return immediately" ] [ if not port? port [port: make port! port] switch/default port/scheme [ udp [ ; PROBLEM: does this really work? if port/state/flags = 4194304 [ ;port: throw-on-error [open/binary/direct/no-wait port] port: open/binary/direct/no-wait port ] port/awake: func [port /local message] [ message: make binary! 65538 if 0 < read-io port message 65536 [ localize [port] [ process-message port message func [result] [ result: any [result encode-message port [ERROR NO-HANDLER "I cannot handle that request"]] if 64512 < length? result [ result: encode-message port [ERROR TOO-BIG "Result too big for UDP"] ] insert port result ] ] ] false ] either async [ ; DEPENDENCY: async-protocol.r debug 10 < length? system/ports/wait-list ["[3] Wait list too long!!!" length? system/ports/wait-list] wait-start port none ] [ wait port ] ] tcp [ ; start HTTP mini server ; PROBLEM: does this really work? if port/state/flags = 4194304 [ ;port: throw-on-error [open/direct/binary/no-wait port] port: open/direct/binary/no-wait port ] either port/host [ ; normal port, act as if it was a new connection handle-new-connection port if not async [wait [ ]] ] [ ; listen port port/awake: func [port] [ debug true "listen awake" handle-new-connection first port false ] either async [ ; DEPENDENCY: async-protocol.r debug 10 < length? system/ports/wait-list ["[4] Wait list too long!!!" length? system/ports/wait-list] wait-start port none ] [ wait port ] ] ] ] [ ;throw make error! "Unsupported port scheme" make error! "Unsupported port scheme" ] ] #do [document { ---PROCESS-MESSAGE This function can be used to handle messages in a CGI script, so that it is possible to use an existing web server instead of the built-in mini HTTP server to listen for messages. It gets the data (as a string! or binary! etc.) that was received with POST (for e.g.) and returns another string! or binary! that should be sent as the script output (i.e. the result). The result is returned calling the provided callback function. This could be using to process messages coming from other protocols, too. For example you could process emails and so on; however, SEND-MESSAGE only supports UDP and HTTP so far. }] process-message: closure [ "Handle one message" port [port! none!] message [any-string!] callback [any-function!] ] [ ;debug true ["process-message" mold :message] handle-message port decode-message port message func [result] [ ;debug true ["!> result:" mold :result] callback if :result [encode-message port :result] ] ]