memcachedサーバをPHPで作ってみた

Peclのmemcacheライブラリで接続出来るmemcachedサーバをPHPで作ってみた。
実用性はない。
 
memcachedプロトコルはかなり楽。
DNSプロトコルについて調べた後だからかも知れないけど。
PHPmemcachedは11212ポートで動かしておく。
本家のポートの隣。
 
一発で送る必要があるから、Telnetじゃ動かない。
複数行を想定するASCIIのソケットプログラミングは面倒臭い。
HTTPとかも。
 

本家memcachedと同じ動きをするか確認

大体あってた。
 

ソース
<?php
if (!in_array(@$argv[1], array('c', 'php'))) {
	echo "usage: memcached_client.php (php|c)\n";
	exit();
}
$port = $argv[1]=='c' ? 11211 : 11212;

$memcache = new Memcache;
$memcache->connect('127.0.0.1', $port);
$memcache->flush();
$memcache->set('deleteparam', "ddddddddddd", 0, 3);
$memcache->set('aliveparam', "aaaaaa", 0, 0);
$memcache->set('de', "100", 0, 0);
$memcache->set('in', "100", 0, 0);
$memcache->flush(6);
for ($i=0; $i<9; $i++) {
	echo "-- $i ------------\n";
	var_dump($memcache->get('deleteparam'));
	var_dump($memcache->get('aliveparam'));
	var_dump($memcache->decrement('de', $i));
	var_dump($memcache->increment('in', $i));
	sleep(1);
}
var_dump($memcache->getVersion());
$memcache->close();

 

本家実行結果
$ php memcached_client.php c
-- 0 ------------
string(11) "ddddddddddd"
string(6) "aaaaaa"
int(100)
int(100)
-- 1 ------------
string(11) "ddddddddddd"
string(6) "aaaaaa"
int(99)
int(101)
-- 2 ------------
string(11) "ddddddddddd"
string(6) "aaaaaa"
int(97)
int(103)
-- 3 ------------
bool(false)
string(6) "aaaaaa"
int(94)
int(106)
-- 4 ------------
bool(false)
string(6) "aaaaaa"
int(90)
int(110)
-- 5 ------------
bool(false)
bool(false)
bool(false)
bool(false)
-- 6 ------------
bool(false)
bool(false)
bool(false)
bool(false)
-- 7 ------------
bool(false)
bool(false)
bool(false)
bool(false)
-- 8 ------------
bool(false)
bool(false)
bool(false)
bool(false)
string(5) "1.2.8"

 

PHP実行結果
$ php memcached_client.php php
-- 0 ------------
string(11) "ddddddddddd"
string(6) "aaaaaa"
int(100)
int(100)
-- 1 ------------
string(11) "ddddddddddd"
string(6) "aaaaaa"
int(99)
int(101)
-- 2 ------------
string(11) "ddddddddddd"
string(6) "aaaaaa"
int(97)
int(103)
-- 3 ------------
bool(false)
string(6) "aaaaaa"
int(94)
int(106)
-- 4 ------------
bool(false)
string(6) "aaaaaa"
int(90)
int(110)
-- 5 ------------
bool(false)
string(6) "aaaaaa"
int(85)
int(115)
-- 6 ------------
bool(false)
bool(false)
bool(false)
bool(false)
-- 7 ------------
bool(false)
bool(false)
bool(false)
bool(false)
-- 8 ------------
bool(false)
bool(false)
bool(false)
bool(false)
string(8) "0.1(PHP)"

 

ベンチマーク

平均して2.2倍位。
Memcacheはやっぱりすごかった - アシアルブログで書かれてるMySQLAPCとの比較から察するに、
APC >>>>>>> memcached >>> PHP memcached >>>> MySQL
多分これ位。
ところでAPCとは何だろう。
 

ソース
<?php
if (!in_array(@$argv[1], array('c', 'php'))) {
	echo "usage: memcached_bench.php (php|c)\n";
	exit();
}
$port = $argv[1]=='c' ? 11211 : 11212;

$memcache = new Memcache;
$memcache->connect('127.0.0.1', $port);
$memcache->flush();

// set
$start = microtime(true);
for ($i=0; $i<100000; $i++) {
	$memcache->set(md5($i), sha1($i));
}
$set_time = microtime(true) - $start;

// get
$start = microtime(true);
for ($i=0; $i<100000; $i++) {
	$memcache->get(md5($i));
}
$get_time = microtime(true) - $start;

// delete
$start = microtime(true);
for ($i=0; $i<100000; $i++) {
	$memcache->delete(md5($i));
}
$delete_time = microtime(true) - $start;

echo "set time: {$set_time}\n";
echo "get time: {$get_time}\n";
echo "delete time: {$delete_time}\n";

 

本家実行結果
$ php memcached_bench.php c
set time: 4.1494679450989
get time: 3.6912648677826
delete time: 3.7728979587555

 

PHP実行結果
$ php memcached_bench.php php
set time: 8.9850809574127
get time: 8.3036599159241
delete time: 8.6144380569458

 

ソース

MemcachedServer.php
<?php
require_once(dirname(__FILE__).'/SocketServerTCP.php');
set_time_limit(0);

class MemcachedServer extends SocketServerTCP
{
	public $port = 11212;
	private $_data = array();
	private $_block = null;


	/* receive */

	protected function exec($sock_id, $buf)
	{
//		$filename = "./log";
//		file_put_contents($filename, $buf);
		$request = MemcachedServer::parseRequest($buf);
		if ($request) {
			$method = 'exec_'.$request['mode'];
			if (method_exists($this, $method)) {
				if (isset($request['key'])) {
					$this->check_expire($request['key']);
				}
				$this->$method($sock_id, $request);
				return;
			}
		}
		$this->send($sock_id, "ERROR\n");
	}


	/* return */

	private function exec_set($sock_id, $request)
	{
		$this->_data[$request['key']] = $request;
		$this->send($sock_id, "STORED\r\n");
	}

	private function exec_add($sock_id, $request)
	{
		if (!isset($this->_data[$request['key']])) {
			$this->exec_set($sock_id, $request);
			return;
		}
		$this->send($sock_id, "NOT_STORED\r\n");
	}

	private function exec_replace($sock_id, $request)
	{
		if (isset($this->_data[$request['key']]['var'])) {
			$this->exec_set($sock_id, $request);
			return;
		}
		$this->send($sock_id, "NOT_STORED\r\n");
	}

	private function exec_append($sock_id, $request)
	{
		if (isset($this->_data[$request['key']]['var'])) {
			$var = $this->_data[$request['key']]['var'];
			$this->_data[$request['key']] = $request;
			$this->_data[$request['key']]['var'] = $var.$this->_data[$request['key']]['var'];
			$this->send($sock_id, "STORED\r\n");
			return;
		}
		$this->send($sock_id, "NOT_STORED\r\n");
	}

	private function exec_prepend($sock_id, $request)
	{
		if (isset($this->_data[$request['key']]['var'])) {
			$var = $this->_data[$request['key']]['var'];
			$this->_data[$request['key']] = $request;
			$this->_data[$request['key']]['var'] .= $var;
			$this->send($sock_id, "STORED\r\n");
			return;
		}
		$this->send($sock_id, "NOT_STORED\r\n");
	}

	private function exec_incr($sock_id, $request)
	{
		if (isset($this->_data[$request['key']]['var'])) {
			$this->_data[$request['key']]['var'] += $request['var'];
			$this->send($sock_id, $this->_data[$request['key']]['var']."\r\n");
			return;
		}
		$this->send($sock_id, "NOT_FOUND\r\n");
	}

	private function exec_decr($sock_id, $request)
	{
		if (isset($this->_data[$request['key']]['var'])) {
			$this->_data[$request['key']]['var'] -= $request['var'];
			$this->send($sock_id, $this->_data[$request['key']]['var']."\r\n");
			return;
		}
		$this->send($sock_id, "NOT_FOUND\r\n");
	}

	private function exec_get($sock_id, $request)
	{
		$data = '';
		if (isset($this->_data[$request['key']]['var'])) {
			$data = sprintf(
				"VALUE %s %u %u\r\n%s",
				$request['key'],
				$this->_data[$request['key']]['flag'],
				$this->_data[$request['key']]['len'],
				$this->_data[$request['key']]['var']
			);
		}
		$data .= "END\r\n";
		$this->send($sock_id, $data);
	}

	private function exec_delete($sock_id, $request)
	{
		if (isset($this->_data[$request['key']]['var'])) {
			if (isset($request['block'])) {
				$this->_data[$request['key']] = $request;
			} else {
				unset($this->_data[$request['key']]);
			}
			$this->send($sock_id, "DELETED\r\n");
			return;
		}
		$this->send($sock_id, "NOT_FOUND\r\n");
	}

	private function exec_flush_all($sock_id, $request)
	{
		$this->_block = $request['block'];
		if (is_null($request['block'])) {
			$this->_data = array();
		}
		$this->send($sock_id, "OK\r\n");
	}

	private function exec_quit($sock_id)
	{
		$this->close($sock_id);
	}

	private function exec_version($sock_id)
	{
		$this->send($sock_id, "VERSION 0.1(PHP)\r\n");
	}


	/* util */

	public static function parseRequest($req)
	{
		$result = null;
		@list($mode, $req) = explode(' ', $req, 2);
		$mode = trim($mode, "\r\n");
		switch ($mode) {
		case 'set':
		case 'add':
		case 'replace':
		case 'append':
		case 'prepend':
			@list($req, $var) = explode("\r\n", $req, 2);
			@list($key, $flag, $expire, $len) = explode(' ', $req);
			if ($expire==0) {
				$expire = null;
			} elseif ($expire<=2592000) {
				$expire = time()+$expire;
			}
			$result = array(
				'mode' => $mode,
				'key' => trim($key, "\r\n"),
				'flag' => $flag,
				'expire' => $expire,
				'len' => $len,
				'var' => $var
			);
			break;
		case 'incr':
		case 'decr':
			@list($key, $var) = explode(' ', $req);
			$result = array(
				'mode' => $mode,
				'key' => $key,
				'var' => $var
			);
			break;
		case 'get':
			$result = array(
				'mode' => $mode,
				'key' => trim($req, "\r\n")
			);
			break;
		case 'delete':
			@list($key, $block) = explode(' ', trim($req));
			$block = (int)trim($block);
			if ($block) {
				$block += time();
			} else {
				$block = null;
			}
			$result = array(
				'mode' => $mode,
				'key' => trim($key, "\r\n"),
				'block' => $block
			);
			break;
		case 'flush_all':
			$block = (int)trim($req);
			if ($block) {
				$block += time();
			} else {
				$block = null;
			}
			$result = array(
				'mode' => $mode,
				'block' => $block
			);
			break;
		case 'quit':
		case 'version':
			$result = array(
				'mode' => $mode
			);
			break;
		}
		return $result;
	}

	private function check_expire($key)
	{
		// flush block
		if (isset($this->_block) && $this->_block<=time()) {
			$this->_data = array();
			$this->_block = null;
			return;
		}
		// delete block
		if (isset($this->_data[$key]['block']) && $this->_data[$key]['block']<=time()) {
			unset($this->_data[$key]);
			return;
		}
		// expire
		if (isset($this->_data[$key]['expire']) && $this->_data[$key]['expire']<=time()) {
			unset($this->_data[$key]);
			return;
		}
	}
}


$memserver = new MemcachedServer();
try {
	$memserver->create();
	$memserver->bind();
	$memserver->listen();
	$memserver->run();
} catch(Exception $e) {
	echo $e->getMessage();
}

 

SocketServerTCP.php
<?php
class SocketServerTCP
{
	// server settings
	public $address = '0.0.0.0';
	public $port = 8000;
	public $backlog = 0;
	public $read_length = 4096;

	// resource
	protected $_sock;
	protected $_pool;

	// callback
	protected $_callback_open;
	protected $_callback_exec;
	protected $_callback_quit;

	public function __construct()
	{
		$this->_callback_open = array($this, 'open');
		$this->_callback_exec = array($this, 'exec');
		$this->_callback_quit = array($this, 'quit');
	}

	public function create()
	{
		if (($this->_sock = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false) {
			throw new Exception("socket_create() failed: ".socket_strerror($this->_sock));
		}
	}

	public function bind()
	{
		if (@socket_bind($this->_sock, $this->address, $this->port) === false) {
			throw new Exception("socket_bind() failed: ".socket_strerror(socket_last_error($this->_sock)));
		}
	}

	public function listen()
	{
		if (@socket_listen($this->_sock, $this->backlog) === false) {
			throw new Exception("socket_listen() failed: ".socket_strerror(socket_last_error($this->_sock)));
		}
	}

	public function run()
	{
		$this->_pool = array($this->_sock);
		while (true) {
			// close
			foreach ($this->_pool as $sock_id => $sock) {
				if (!is_resource($sock)) {
					call_user_func_array($this->_callback_quit, array($sock_id));
				}
			}
			// copy
			$active = $this->_pool;
			// select
			if (@socket_select($active, $w = null, $e = null, null) === false) {
				throw new Exception("socket_select() failed: ".socket_strerror(socket_last_error($active)));
			}
			// new client
			if (in_array($this->_sock, $active)) {
				$sock = socket_accept($this->_sock);
				if (is_resource($sock)) {
					$sock_id = (integer)$sock;
					$this->_pool[$sock_id] = $sock;
					// callback open
					call_user_func_array($this->_callback_open, array($sock_id));
				}
				unset($active[array_search($this->_sock, $active)]);
			}
			// all client
			foreach ($active as $sock) {
				$sock_id = (integer)$sock;
				$buf = socket_read($sock, $this->read_length);
				if ($buf===false || strlen($buf)===0) {
					$this->_pool[$sock_id] = false;
					continue;
				}
				call_user_func_array($this->_callback_exec, array($sock_id, $buf));
			}
		}
	}

	public function getSockInfo($sock_id)
	{
		if (isset($this->_pool[$sock_id])) {
			$from = '';
			$port = 0;
			socket_getpeername($this->_pool[$sock_id], $from, $port);
			return array('address'=>$from, 'port'=>$port);
		}
		return null;
	}

	public function send($sock_id, $message)
	{
		if (isset($this->_pool[$sock_id]) && is_resource($this->_pool[$sock_id])) {
			socket_write($this->_pool[$sock_id], $message, strlen($message));
		}
	}

	public function close($sock_id)
	{
		if (isset($this->_pool[$sock_id])) {
			if (is_resource($this->_pool[$sock_id])) {
				socket_close($this->_pool[$sock_id]);
			}
			unset($this->_pool[$sock_id]);
		}
	}

	// handle

	protected function open($sock_id)
	{
		printf("OPEN : id:%s count:%s\n", $sock_id, count($this->_pool));
	}

	protected function exec($sock_id, $buf)
	{
		if (in_array(strtolower(trim($buf)), array('quit', 'exit', 'bye'))) {
			$this->quit($sock_id);
			return;
		}
		printf("EXEC : id:%s count:%s buffer:%s\n", $sock_id, count($this->_pool), $buf);
	}

	protected function quit($sock_id)
	{
		$this->close($sock_id);
		printf("QUIT : id:%s count:%s\n", $sock_id, count($this->_pool));
	}
}

 
MySQLが好き。