<?php
namespace ZF\Net\XMPP\XML;
use \ZF\Net\XMPP;
class Stream {
const SECURE_NONE = 0;
const SECURE_SSL = 1;
const SECURE_TLS = 2;
protected static $socket_protocols = array(
self::SECURE_NONE => "tcp://",
self::SECURE_SSL => "ssl://",
self::SECURE_TLS => "tls://",
);
protected $sock = NULL;
protected $parser = NULL;
protected $xml_depth = 0;
protected $namespaces = array();
protected $packet_ready = false;
protected $stream_closed = false;
protected $stream_depth = 0;
protected $connected = false;
public $debug = false;
public $print_data = false;
public $error = '';
public $stream = NULL;
public $current_tag = NULL;
protected $params = array(
"server" => "127.0.0.1",
"port" => 5222,
"secure" => self::SECURE_NONE,
"read_timeout" => 5000,
"wait_read_timeout" => 50000,
"write_timeout" => 5000,
"connect_timeout" => 30
);
public function __construct($params) {
$this->params = array_merge($this->params, $params);
}
public function connect() {
$this->sock = @fsockopen(self::$socket_protocols[$this->params["secure"]].$this->params["server"],
$this->params["port"], $errstr, $errno, $this->params["connect_timeout"]);
if (!$this->sock) {
$this->error("Can't open socket: ".$errstr." (".$errno.")");
return false;
}
$this->connected = true;
$this->setupParser();
}
public function free() {
if ($this->xml_depth > 0)
$this->current_tag->elements = array();
$this->packet_ready = false;
return $this;
}
public function send($data) {
if (!$this->connected)
throw new XMPPStreamException("Connection not found!");
$read = array();
$write = array($this->sock);
$except = array();
if (feof($this->sock)) {
$this->error('Connection reset!');
$this->close();
return false;
}
$this->error = '';
$updated = @stream_select($read, $write, $except, NULL, NULL);
if ($updated === false) {
$this->error('Socket select error (send): '.socket_last_error($this->sock));
$this->close();
return false;
} elseif ($updated > 0) {
@fwrite($this->sock, $data);
if ($this->print_data)
$this->logTransfer($data, true);
} else {
$this->error('Socket no ready to write (send): '.socket_last_error($this->sock));
$this->close();
return false;
}
return $this;
}
public function recv($while = true, $timeout = NULL) {
if (!$this->connected)
throw new XMPPStreamException("Connection not found!");
$this->free();
$this->error = '';
$start = microtime(true);
if ($timeout === NULL)
$timeout = $this->params["wait_read_timeout"];
if ($timeout === 0) {
$sec = NULL;
$usec = NULL;
} else {
$sec = floor($timeout / 1000000);
$usec = $timeout % 1000000;
}
if (feof($this->sock)) {
$this->error('Connection reset!');
$this->close();
return false;
}
while ($while) {
$read = array($this->sock);
$write = array();
$except = array();
$updated = @stream_select($read, $write, $except, $sec, $usec);
if ($updated === false) {
$this->close();
$this->error('Socket select error (recv): '.socket_last_error($this->sock));
return false;
} else if ($updated > 0) {
$data = preg_replace("/<\?xml.*?\?>/si", "", @fread($this->sock, 4096));
if (strlen($data) != 0) {
if ($this->print_data)
$this->logTransfer($data, false);
xml_parse($this->parser, $data, false);
}
// Если стрим отпал - выходим
if ($this->stream_closed) {
$this->error('Stream unexpected closed!');
return false;
}
if ($this->packet_ready)
break;
if (strlen($data) == 0) {
if ((microtime(true) - $start) * 1000 > $this->params['read_timeout']) {
$this->close();
$this->error('Timeout occured at recv! ('.$this->params['read_timeout'].' ms)');
return false;
}
}
} else {
if ((microtime(true) - $start) * 1000 > $this->params['read_timeout']) {
$this->close();
$this->error('Timeout occured at recv! ('.$this->params['read_timeout'].' ms)');
return false;
}
if ($this->packet_ready)
break;
}
}
return $this->current_tag->elements ? $this->current_tag->elements[0] : false;
}
public function setupParser() {
if (!$this->connected)
throw new XMPPStreamException("Connection not found!");
if ($this->parser)
xml_parser_free($this->parser);
$this->parser = xml_parser_create('UTF-8');
xml_parser_set_option($this->parser, XML_OPTION_SKIP_WHITE, true);
xml_set_object($this->parser, $this);
xml_set_element_handler($this->parser, 'startXML', 'endXML');
xml_set_character_data_handler($this->parser, 'charXML');
$this->xml_depth = 0;
$this->stream_depth = 0;
$this->packet_ready = false;
$this->stream_closed = false;
}
// xml callback
public function startXML($p, $name, $attrs = []) {
++$this->xml_depth;
$this->ns_map[$this->xml_depth] = array();
foreach ($attrs as $k => $v) {
if (strpos($k, ":") === false) continue;
list ($attr_name, $ns_prefix) = explode(":", $k, 2);
if ($attr_name != "XMLNS")
continue;
$this->ns_map[$this->xml_depth][$ns_prefix] = $v;
}
$ns = "";
if (isset($attrs["XMLNS"]))
$ns = $attrs["XMLNS"];
$ns_prefix = "";
if (strpos($name, ":") !== false) {
list ($ns_prefix, $tag_name) = explode(":", $name, 2);
for ($i = $this->xml_depth; $i >= 1; --$i) {
if (isset($this->ns_map[$i][$ns_prefix])) {
$ns = $this->ns_map[$i][$ns_prefix];
break;
}
}
} else
$tag_name = $name;
if ($this->xml_depth == 0) {
$this->data = new XMPP\XML\Object($tag_name, $ns_prefix, $ns, $attrs, NULL);
$this->current_tag = $this->data;
} else {
$el = new XMPP\XML\Object($tag_name, $ns_prefix, $ns, $attrs, $this->current_tag);
$this->current_tag->elements[] = $el;
$this->current_tag = $el;
}
}
// xml callback
public function endXML($p, $name, $attrs = []) {
array_pop($this->ns_map);
--$this->xml_depth;
// Если вышли на уровень стрима (ил ниже oO) - значит успешно считали всеь пакет
if ($this->xml_depth <= $this->stream_depth)
$this->packet_ready = true;
// ВНЕЗАПНО закрыли стрим со стороны сервера
if ($this->xml_depth < $this->stream_depth) {
$this->stream_closed = true;
--$this->stream_depth;
}
$this->current_tag = $this->current_tag->parent;
}
// xml callback
public function charXML($p, $data) {
$this->current_tag->value .= $data;
}
public function startStream() {
++$this->stream_depth;
if (!$this->send('<stream:stream to="'.htmlspecialchars($this->params["server"], ENT_QUOTES).'" version="1.0" '
.'xmlns="jabber:client" xmlns:stream="http://etherx.jabber.org/streams">'))
return false;
$tag = $this->recv();
if (!$tag)
return false;
if ($tag->name != "FEATURES") {
$this->error("Can't open stream: ".$tag->toString());
return false;
}
return true;
}
public function endStream() {
--$this->stream_depth;
if (!$this->send('</stream:stream>'))
return false;
$tag = $this->recv();
if (!$tag)
return false;
if ($tag->name != "STREAM") {
$this->error("Can't close stream: ".$tag->toString());
return false;
}
return true;
}
public function isConnected() {
return $this->connected && !feof($this->sock);
}
public function getStreamID() {
return $this->stream_depth;
}
public function close() {
if ($this->sock)
fclose($this->sock);
$this->connected = false;
}
// debug
public function logTransfer($data, $is_write) {
echo "\033[0;".($is_write ? 35 : 32)."m$data\033[0;0m\n";
}
public function error($msg) {
$this->error = $msg;
if ($this->debug)
echo "\033[0;31m$msg\033[0;0m\n";
}
}