1 package OpenSRF::Transport::SlimJabber::XMPPReader;
2 use strict; use warnings;
4 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
5 use Time::HiRes qw/time/;
6 use OpenSRF::Transport::SlimJabber::XMPPMessage;
7 use OpenSRF::Utils::Logger qw/$logger/;
9 # -----------------------------------------------------------
10 # Connect, disconnect, and authentication messsage templates
11 # -----------------------------------------------------------
12 use constant JABBER_CONNECT =>
13 "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
15 use constant JABBER_BASIC_AUTH =>
16 "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
17 "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
19 use constant JABBER_DISCONNECT => "</stream:stream>";
22 # -----------------------------------------------------------
24 # -----------------------------------------------------------
25 use constant DISCONNECTED => 1;
26 use constant CONNECT_RECV => 2;
27 use constant CONNECTED => 3;
30 # -----------------------------------------------------------
32 # -----------------------------------------------------------
33 use constant IN_NOTHING => 1;
34 use constant IN_BODY => 2;
35 use constant IN_THREAD => 3;
36 use constant IN_STATUS => 4;
39 # -----------------------------------------------------------
40 # Constructor, getter/setters
41 # -----------------------------------------------------------
46 my $self = bless({}, $class);
49 $self->{stream_state} = DISCONNECTED;
50 $self->{xml_state} = IN_NOTHING;
51 $self->socket($socket);
53 my $p = new XML::Parser(Handlers => {
54 Start => \&start_element,
59 $self->parser($p->parse_start); # create a push parser
60 $self->parser->{_parent_} = $self;
61 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
67 push(@{$self->{queue}}, $msg) if $msg;
72 return shift @{$self->{queue}};
77 return (@{$self->{queue}} > 0);
81 my($self, $parser) = @_;
82 $self->{parser} = $parser if $parser;
83 return $self->{parser};
87 my($self, $socket) = @_;
88 $self->{socket} = $socket if $socket;
89 return $self->{socket};
93 my($self, $stream_state) = @_;
94 $self->{stream_state} = $stream_state if $stream_state;
95 return $self->{stream_state};
99 my($self, $xml_state) = @_;
100 $self->{xml_state} = $xml_state if $xml_state;
101 return $self->{xml_state};
105 my($self, $message) = @_;
106 $self->{message} = $message if $message;
107 return $self->{message};
111 # -----------------------------------------------------------
112 # Stream and connection handling methods
113 # -----------------------------------------------------------
116 my($self, $domain, $username, $password, $resource) = @_;
118 $self->send(sprintf(JABBER_CONNECT, $domain));
121 unless($self->{stream_state} == CONNECT_RECV) {
122 $logger->error("No initial XMPP response from server");
126 $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
129 unless($self->connected) {
130 $logger->error('XMPP connect failed');
139 if($self->tcp_connected) {
140 $self->send(JABBER_DISCONNECT);
141 shutdown($self->socket, 2);
143 close($self->socket);
146 # -----------------------------------------------------------
147 # returns true if this stream is connected to the server
148 # -----------------------------------------------------------
151 return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
154 # -----------------------------------------------------------
155 # returns true if the socket is connected
156 # -----------------------------------------------------------
159 return ($self->socket and $self->socket->connected);
162 # -----------------------------------------------------------
163 # sends pre-formated XML
164 # -----------------------------------------------------------
166 my($self, $xml) = @_;
167 $self->{socket}->print($xml);
170 # -----------------------------------------------------------
171 # Puts a file handle into blocking mode
172 # -----------------------------------------------------------
175 my $flags = fcntl($fh, F_GETFL, 0);
176 $flags &= ~O_NONBLOCK;
177 fcntl($fh, F_SETFL, $flags);
181 # -----------------------------------------------------------
182 # Puts a file handle into non-blocking mode
183 # -----------------------------------------------------------
186 my $flags = fcntl($fh, F_GETFL, 0);
187 fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
192 my($self, $timeout) = @_;
194 return $self->next_msg if $self->peek_msg;
197 $timeout = undef if $timeout < 0;
198 my $socket = $self->{socket};
202 # build the select readset
204 vec($infile, $socket->fileno, 1) = 1;
205 return undef unless select($infile, undef, undef, $timeout);
207 # now slurp the data off the socket
209 my $read_size = 1024;
210 while(my $n = sysread($socket, $buf, $read_size)) {
211 $self->{parser}->parse_more($buf) if $buf;
212 if($n < $read_size or $self->peek_msg) {
216 set_nonblock($socket);
219 return $self->next_msg;
222 # -----------------------------------------------------------
223 # Waits up to timeout seconds for a fully-formed XMPP
224 # message to arrive. If timeout is < 0, waits indefinitely
225 # -----------------------------------------------------------
227 my($self, $timeout) = @_;
230 $timeout = 0 unless defined $timeout;
234 return $xml if $xml = $self->wait($timeout);
238 while($timeout >= 0) {
240 return $xml if $xml = $self->wait($timeout);
241 $timeout -= time - $start;
249 # -----------------------------------------------------------
251 # -----------------------------------------------------------
255 my($parser, $name, %attrs) = @_;
256 my $self = $parser->{_parent_};
258 if($name eq 'message') {
260 my $msg = $self->{message};
261 $msg->{to} = $attrs{'to'};
262 $msg->{from} = $attrs{router_from} if $attrs{router_from};
263 $msg->{from} = $attrs{from} unless $msg->{from};
264 $msg->{osrf_xid} = $attrs{'osrf_xid'};
265 $msg->{type} = $attrs{type};
267 } elsif($name eq 'body') {
268 $self->{xml_state} = IN_BODY;
270 } elsif($name eq 'thread') {
271 $self->{xml_state} = IN_THREAD;
273 } elsif($name eq 'stream:stream') {
274 $self->{stream_state} = CONNECT_RECV;
276 } elsif($name eq 'iq') {
277 if($attrs{type} and $attrs{type} eq 'result') {
278 $self->{stream_state} = CONNECTED;
281 } elsif($name eq 'status') {
282 $self->{xml_state } = IN_STATUS;
284 } elsif($name eq 'stream:error') {
285 $self->{stream_state} = DISCONNECTED;
287 } elsif($name eq 'error') {
288 $self->{message}->{err_type} = $attrs{'type'};
289 $self->{message}->{err_code} = $attrs{'code'};
290 $self->{stream_state} = DISCONNECTED;
295 my($parser, $chars) = @_;
296 my $self = $parser->{_parent_};
297 my $state = $self->{xml_state};
299 if($state == IN_BODY) {
300 $self->{message}->{body} .= $chars;
302 } elsif($state == IN_THREAD) {
303 $self->{message}->{thread} .= $chars;
305 } elsif($state == IN_STATUS) {
306 $self->{message}->{status} .= $chars;
311 my($parser, $name) = @_;
312 my $self = $parser->{_parent_};
313 $self->{xml_state} = IN_NOTHING;
315 if($name eq 'message') {
316 $self->push_msg($self->{message});
317 $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
319 } elsif($name eq 'stream:stream') {
320 $self->{stream_state} = DISCONNECTED;
326 my $socket = $self->socket;
327 return 0 unless $socket and $socket->connected;
329 my $flags = fcntl($socket, F_GETFL, 0);
330 fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
332 while( my $n = sysread( $socket, my $buf, 8192 ) ) {
333 $logger->debug("flush_socket dropped $n bytes of data");
334 $logger->error("flush_socket dropped data on disconnected socket: $buf")
335 unless($socket->connected);
338 fcntl($socket, F_SETFL, $flags);
339 return 0 unless $socket->connected;