File "SocketConsumer.php"
Full Path: /home/vantageo/public_html/cache/cache/cache/cache/cache/cache/cache/.wp-cli/wp-content/plugins/woo-product-filter/modules/promo/models/classes/lib/ConsumerStrategies/SocketConsumer.php
File size: 7.47 KB
MIME-type: text/x-php
Charset: utf-8
<?php
/**
* Portions of this class were borrowed from
* https://github.com/segmentio/analytwpf-php/blob/master/lib/Analytwpf/Consumer/Socket.php.
* Thanks for the work!
*
* WWWWWW||WWWWWW
* W W W||W W W
* ||
* ( OO )__________
* / | \
* /o o| MIT \
* \___/||_||__||_|| *
* || || || ||
* _||_|| _||_||
* (__|__|(__|__|
* (The MIT License)
*
* Copyright (c) 2013 Segment.io Inc. friends@segment.io
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
* OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
require_once(dirname(__FILE__) . '/AbstractConsumer.php');
/**
* Consumes messages and writes them to host/endpoint using a persistent socket
*/
class ConsumerStrategies_SocketConsumer extends ConsumerStrategies_AbstractConsumer {
private $_host;
private $_endpoint;
private $_connect_timeout;
private $_protocol;
private $_socket;
private $_async;
/**
* Creates a new SocketConsumer and assigns properties from the $options array
*
* @param array $options
*/
public function __construct( $options = array() ) {
parent::__construct($options);
$this->_host = $options['host'];
$this->_endpoint = $options['endpoint'];
$this->_connect_timeout = array_key_exists('connect_timeout', $options) ? $options['connect_timeout'] : 5;
$this->_async = array_key_exists('async', $options) && false === $options['async'] ? false : true;
if (array_key_exists('use_ssl', $options) && true == $options['use_ssl']) {
$this->_protocol = 'ssl';
$this->_port = 443;
} else {
$this->_protocol = 'tcp';
$this->_port = 80;
}
}
/**
* Write using a persistent socket connection.
*
* @param array $batch
* @return bool
*/
public function persist( $batch ) {
$socket = $this->_getSocket();
if (!is_resource($socket)) {
return false;
}
$data = 'data=' . $this->_encode($batch);
$body = '';
$body.= 'POST ' . $this->_endpoint . " HTTP/1.1\r\n";
$body.= 'Host: ' . $this->_host . "\r\n";
$body.= "Content-Type: application/x-www-form-urlencoded\r\n";
$body.= "Accept: application/json\r\n";
$body.= 'Content-length: ' . strlen($data) . "\r\n";
$body.= "\r\n";
$body.= $data;
return $this->_write($socket, $body);
}
/**
* Return cached socket if open or create a new persistent socket
*
* @return bool|resource
*/
private function _getSocket() {
if (is_resource($this->_socket)) {
if ($this->_debug()) {
$this->_log('Using existing socket');
}
return $this->_socket;
} else {
if ($this->_debug()) {
$this->_log('Creating new socket at ' . time());
}
return $this->_createSocket();
}
}
/**
* Attempt to open a new socket connection, cache it, and return the resource
*
* @param bool $retry
* @return bool|resource
*/
private function _createSocket( $retry = true ) {
try {
$socket = pfsockopen($this->_protocol . '://' . $this->_host, $this->_port, $err_no, $err_msg, $this->_connect_timeout);
if ($this->_debug()) {
$this->_log('Opening socket connection to ' . $this->_protocol . '://' . $this->_host . ':' . $this->_port);
}
if (0 != $err_no) {
$this->_handleError($err_no, $err_msg);
return true == $retry ? $this->_createSocket(false) : false;
} else {
// cache the socket
$this->_socket = $socket;
return $socket;
}
} catch (Exception $e) {
$this->_handleError($e->getCode(), $e->getMessage());
return true == $retry ? $this->_createSocket(false) : false;
}
}
/**
* Attempt to close and dereference a socket resource
*/
private function _destroySocket() {
$socket = $this->_socket;
$this->_socket = null;
fclose($socket);
}
/**
* Write $data through the given $socket
*
* @param $socket
* @param $data
* @param bool $retry
* @return bool
*/
private function _write( $socket, $data, $retry = true ) {
$bytes_sent = 0;
$bytes_total = strlen($data);
$socket_closed = false;
$success = true;
$max_bytes_per_write = 8192;
// if we have no data to write just return true
if (0 == $bytes_total) {
return true;
}
// try to write the data
while (!$socket_closed && $bytes_sent < $bytes_total) {
try {
$bytes = fwrite($socket, $data, $max_bytes_per_write);
if ($this->_debug()) {
$this->_log('Socket wrote ' . $bytes . ' bytes');
}
// if we actually wrote data, then remove the written portion from $data left to write
if ($bytes > 0) {
$data = substr($data, $max_bytes_per_write);
}
} catch (Exception $e) {
$this->_handleError($e->getCode(), $e->getMessage());
$socket_closed = true;
}
if (isset($bytes) && $bytes) {
$bytes_sent += $bytes;
} else {
$socket_closed = true;
}
}
// create a new socket if the current one is closed and retry the message
if ($socket_closed) {
$this->_destroySocket();
if ($retry) {
if ($this->_debug()) {
$this->_log('Retrying socket write...');
}
$socket = $this->_getSocket();
if ($socket) {
return $this->_write($socket, $data, false);
}
}
return false;
}
// only wait for the response in debug mode or if we explicitly want to be synchronous
if ($this->_debug() || !$this->_async) {
$res = $this->handleResponse(fread($socket, 2048));
if ('200' != $res['status']) {
$this->_handleError($res['status'], $res['body']);
$success = false;
}
}
return $success;
}
/**
* Parse the response from a socket write (only used for debugging)
*
* @param $response
* @return array
*/
private function handleResponse( $response ) {
$lines = explode("\n", $response);
// extract headers
$headers = array();
foreach ($lines as $line) {
$kvsplit = explode(':', $line);
if (count($kvsplit) == 2) {
$header = $kvsplit[0];
$value = $kvsplit[1];
$headers[$header] = trim($value);
}
}
// extract status
$line_one_exploded = explode(' ', $lines[0]);
$status = $line_one_exploded[1];
// extract body
$body = $lines[count($lines) - 1];
// if the connection has been closed lets kill the socket
if ('close' == $headers['Connection']) {
$this->_destroySocket();
if ($this->_debug()) {
$this->_log("Server told us connection closed so lets destroy the socket so it'll reconnect on next call");
}
}
$ret = array(
'status' => $status,
'body' => $body,
);
return $ret;
}
}