Pthread (Threading for PHP - Share Nothing)

Brew (MacOS)

$ brew install php70-pthreads

pecl

$ pecl install pthread

Docker

$ docker pull jdecool/php-pthreads
$worker = new Worker();
$task = new class extends Threaded {
    public static $a;
};
$task::$a = new class extends Threaded {};
$worker->start();
$worker->stack($task);
$worker->shutdown();
$task = new class extends Threaded {
    public static $a;
};
$task::$a = new class extends Threaded {};
$pool = new Pool(2);
$pool->submit($task);
$pool->shutdown();
<?php
 
function demanding(...$params) {
  /* you have parameters here */
  return array(rand(), rand());
}
 
class Task extends Collectable {
  public function __construct(Threaded $result, $params) {
    $this->result = $result;
    $this->params = $params;
  }
 
  public function run() {
    $this->result[] = 
      demanding(...$this->params);
  }
 
  protected $result;
  protected $params;
}
 
$pool = new Pool(16);
 
$result = new Threaded();
 
while (@$i++<16) {
  $pool->submit(
    new Task($result, $argv));
}
 
$pool->shutdown();
 
var_dump($result);
?>
class Scanner {
    public function performScan() {
        // Add initial task
        $initialTask = "Task 1";
        TaskQueue::addTask($initialTask);
 
        $i = 0;
        while(true) {
            // Get task from queue
           $task = TaskQueue::getTask();
           if ($task == null)
               break;
 
            // Handle task
            $parser[$i] = new Parser($task);
            $parser[$i]->start();
 
            // Wait
            $thread = $parser[$i];
            $thread->synchronized(function() use($thread) {
                while (!$thread->awake) {
                    $thread->wait();
                }
            });
 
            // Join
            $i++;
        }
 
        // Done
        echo "Done\n";
    }
}
 
class Parser extends Thread {
    private $task;
 
    public function __construct($task) {
        $this->task = $task;
    }
 
    public function run() {
        // Perform a time-consuming operation
        // This operation adds an unknown number of extra tasks
        sleep(1);
 
        // Add new tasks to queue
        foreach(range(0, 4) as $i) {
            TaskQueue::addTask("Task {$i}");
        }
 
        // Notify
        $this->synchronized(function(){
            $this->awake = true;
            $this->notify();
        });
    }
}
 
class TaskQueue {
    private static $queue = array();
 
    public static function addTask($task) {
        self::$queue[] = $task;
        echo "Add task to queue!\n";
    }
 
    public static function getTask() {
        if (sizeof(self::$queue) > 0) {
            $task = array_shift(self::$queue);
            echo "Get task from queue!\n";
            return $task;
        }
    }
}
 
$scanner = new Scanner();
$scanner->performScan();
<?php
class Referee extends Threaded {
 
    public function find(string $ident, Threaded $reference) {
        return $this->synchronized(function () use($ident, $reference) {
            if (isset($this[$ident])) {
                return $this[$ident];
            } else return ($this[$ident] = $reference);
        });
    }
 
    public function foreach(Closure $closure) {
        $this->synchronized(function() use($closure) {
            foreach ($this as $ident => $reference) {
                $closure($ident, $reference);
            }
        });
    }
}
 
class Test extends Thread {
 
    public function __construct(Referee $referee, string $ident, bool $delay) {
        $this->referee = $referee;
        $this->ident   = $ident;
        $this->delay   = $delay;
    }
 
    public function run() {
        while (1) {
            if ($this->delay) {
                $this->synchronized(function(){
                    $this->wait(1000000);
                });
            }
 
            $reference = 
                $this->referee->find($this->ident, $this);
 
            /* do something with reference here, I guess */         
 
            /* do something with all references here */
            $this->referee->foreach(function($ident, $reference){
                var_dump(Thread::getCurrentThreadId(),
                        $reference->getIdent(), 
                        $reference->isRunning());
            });
        }
    }
 
    public function getIdent() {
        return $this->ident;
    }
 
    private $referee;
    private $ident;
    private $delay;
}
 
$referee = new Referee();
$threads = [];
$thread = 0;
$idents = [
    "smelly",
    "dopey",
    "bashful",
    "grumpy",
    "sneezy",
    "sleepy",
    "happy",
    "naughty"
];
 
while ($thread < 8) {
    $threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1));
    $threads[$thread]->start();
    $thread++;
}
 
foreach ($threads as $thread)
    $thread->join();
?>
class Wallet{
    public $balance;
    public function __construct($money){
        $this->balance = $money;
    }
    public function getBalance(){
        return $this->balance;
    }
    public function setBalance($value){
        $this->balance = $value;
    }
}
class MyThread extends Thread{
    private $wallet;
    private $std;
    public function __construct($wallet,$std){
        $this->wallet = $wallet;
        $this->std = $std;
    }
    public function run(){
        $this->synchronized(function($thread){
                $hack = $this->wallet;
                if($hack->getBalance() - 80 >0){
                    sleep(1);
                    $hack->setBalance($hack->getBalance() - 80);
                    echo $this->getThreadId() . "reduce 80 successful<br/>Current num is:" . $hack->getBalance() . "<Br/>";
                    //Here is Wrong!  The result is bool(false)????!!!!
                    var_dump($hack == $this->wallet);
                }
                 else
                     echo $this->getThreadId() . "reduce fail<br/>Current num is:" . $hack->getBalance() . "<br/>";
 
        },$this->std);
    }
}
$wallet = new Wallet(200);
$std = new stdClass();
for($x=0;$x<3;$x++){
    $pool[] = new MyThread($wallet,$std);
    $pool[$x]->start();
}