Commit 2602a4b7 authored by Emiliano Balbuena's avatar Emiliano Balbuena

(wip): Algorithms refactor

1 merge request!406WIP: Top Feed algorithm changes
Pipeline #97745840 failed with stages
in 2 minutes and 46 seconds
......@@ -2,70 +2,67 @@
namespace Minds\Controllers\Cli;
use Minds\Core\Feeds\Elastic\Sync;
use Minds\Core\Minds;
use Minds\Cli;
use Minds\Core\Feeds\Elastic\Manager;
use Minds\Exceptions\CliException;
use Minds\Interfaces;
class Top extends Cli\Controller implements Interfaces\CliControllerInterface
{
/** @var Manager */
private $manager;
/** @var Sync */
private $sync;
public function __construct()
{
$minds = new Minds();
$minds->start();
$this->manager = new Manager();
$this->sync = new Sync();
}
public function help($command = null)
{
$this->out('Syntax usage: cli top sync_<type> --period=? --metric=?');
$this->out('Syntax usage: cli top sync_<type> --metric=?');
}
public function exec()
{
$this->out('Syntax usage: cli top sync_<type> --period=? --metric=?');
$this->out('Syntax usage: cli top sync_<type> --metric=?');
}
public function sync_activity()
{
return $this->syncBy('activity', null, $this->getOpt('period') ?? null, $this->getOpt('metric') ?? null);
return $this->syncBy('activity', null, $this->getOpt('metric') ?? null);
}
public function sync_images()
{
return $this->syncBy('object', 'image', $this->getOpt('period') ?? null, $this->getOpt('metric') ?? null);
return $this->syncBy('object', 'image', $this->getOpt('metric') ?? null);
}
public function sync_videos()
{
return $this->syncBy('object', 'video', $this->getOpt('period') ?? null, $this->getOpt('metric') ?? null);
return $this->syncBy('object', 'video', $this->getOpt('metric') ?? null);
}
public function sync_blogs()
{
return $this->syncBy('object', 'blog', $this->getOpt('period') ?? null, $this->getOpt('metric') ?? null);
return $this->syncBy('object', 'blog', $this->getOpt('metric') ?? null);
}
public function sync_groups()
{
return $this->syncBy('group', null, $this->getOpt('period') ?? null, $this->getOpt('metric') ?? null);
return $this->syncBy('group', null, $this->getOpt('metric') ?? null);
}
public function sync_channels()
{
return $this->syncBy('user', null, $this->getOpt('period') ?? null, $this->getOpt('metric') ?? null);
return $this->syncBy('user', null, $this->getOpt('metric') ?? null);
}
protected function syncBy($type, $subtype, $period, $metric)
protected function syncBy($type, $subtype, $metric)
{
if (!$period) {
throw new CliException('Missing --period flag');
}
if (!$metric) {
throw new CliException('Missing --metric flag');
}
......@@ -75,15 +72,15 @@ class Top extends Cli\Controller implements Interfaces\CliControllerInterface
$displayType = trim(implode(':', [$type, $subtype]), ':');
$this->out("Syncing {$displayType} {$period} -> {$metric}");
$this->out("Syncing {$displayType} -> {$metric}");
$this->manager
$this->sync
->setType($type ?: '')
->setSubtype($subtype ?: '')
->run([
'period' => $period,
'metric' => $metric,
]);
->setMetric($metric)
->setFrom(strtotime('-1 day') * 1000)
->setTo(time() * 1000)
->run();
$this->out("\nCompleted syncing '{$displayType}'.");
}
......
......@@ -9,7 +9,6 @@ use Minds\Core\Di\Di;
use Minds\Core\Search\Search;
use Minds\Entities\Entity;
use Minds\Core\EntitiesBuilder;
use Minds\Core\Trending\Aggregates;
class Manager
{
......@@ -25,14 +24,6 @@ class Manager
/** @var Entities */
protected $entities;
private $from;
private $to;
private $type = 'activity';
private $subtype = '';
public function __construct(
$repository = null,
$entitiesBuilder = null,
......@@ -43,29 +34,6 @@ class Manager
$this->entitiesBuilder = $entitiesBuilder ?: new EntitiesBuilder;
$this->entities = $entities ?: new Entities;
$this->search = $search ?: Di::_()->get('Search\Search');
$this->from = strtotime('-7 days') * 1000;
$this->to = time() * 1000;
}
/**
* @param string $type
* @return Manager
*/
public function setType($type)
{
$this->type = $type;
return $this;
}
/**
* @param string $subtype
* @return Manager
*/
public function setSubtype($subtype)
{
$this->subtype = $subtype;
return $this;
}
/**
......@@ -279,114 +247,4 @@ class Manager
return $entities;
}
public function run($opts = [])
{
$opts = array_merge([
'period' => null,
'metric' => null,
], $opts);
$maps = [
'12h' => [
'period' => '12h',
'from' => strtotime('-12 hours') * 1000,
],
'24h' => [
'period' => '24h',
'from' => strtotime('-24 hours') * 1000,
],
'7d' => [
'period' => '7d',
'from' => strtotime('-7 days') * 1000,
],
'30d' => [
'period' => '30d',
'from' => strtotime('-30 days') * 1000,
],
'1y' => [
'period' => '1y',
'from' => strtotime('-1 year') * 1000,
],
];
$period = $opts['period'];
if (!isset($maps[$period]['from'])) {
throw new \Exception('Invalid period');
}
$this->from = $maps[$period]['from'];
$type = $this->type;
if ($this->subtype) {
$type = implode(':', [$this->type, $this->subtype]);
}
switch ($opts['metric']) {
case 'up':
$metricMethod = 'getVotesUp';
$metricId = 'votes:up';
$sign = 1;
break;
case 'down':
$metricMethod = 'getVotesDown';
$metricId = 'votes:down';
$sign = -1;
break;
default:
throw new \Exception('Invalid metric');
}
//sync
$i = 0;
foreach ($this->{$metricMethod}() as $guid => $count) {
$countValue = $sign * $count;
$metric = new MetricsSync();
$metric
->setGuid($guid)
->setType($type)
->setMetric($metricId)
->setCount($countValue)
->setPeriod($maps[$period]['period'])
->setSynced(time());
try {
$this->repository->add($metric);
} catch (\Exception $e) {
}
$i++;
echo "\n$i: $guid -> $metricId = $countValue";
}
// clear any pending bulk inserts
$this->repository->bulk();
}
protected function getVotesUp()
{
$aggregates = new Aggregates\Votes;
$aggregates->setLimit(10000);
$aggregates->setType($this->type);
$aggregates->setSubtype($this->subtype);
$aggregates->setFrom($this->from);
$aggregates->setTo($this->to);
return $aggregates->get();
}
protected function getVotesDown()
{
$aggregates = new Aggregates\DownVotes;
$aggregates->setLimit(10000);
$aggregates->setType($this->type);
$aggregates->setSubtype($this->subtype);
$aggregates->setFrom($this->from);
$aggregates->setTo($this->to);
return $aggregates->get();
}
}
......@@ -6,26 +6,33 @@ use Minds\Traits\MagicAttributes;
/**
* Class MetricsSync
* @package Minds\Core\Feeds\Elastic
* @method string getMetric()
* @method string getPeriod()
* @method int|string getGuid()
* @method MetricsSync setGuid(int|string $guid)
* @method string getType()
* @method MetricsSync setType(string $type)
* @method string getMetric()
* @method MetricsSync setMetric(string $metric)
* @method int getCount()
* @method MetricsSync setCount(int $count)
* @method int getSynced()
* @method int|string getGuid()
* @method MetricsSync setSynced(int $synced)
*/
class MetricsSync
{
use MagicAttributes;
private $guid;
private $type;
/** @var int|string */
protected $guid;
private $metric;
/** @var string */
protected $type;
private $count;
/** @var string */
protected $metric;
private $period;
/** @var int */
protected $count;
private $synced;
/** @var int */
protected $synced;
}
......@@ -432,14 +432,9 @@ class Repository
}
}
public function add(MetricsSync $metric)
public function inc(MetricsSync $metric): bool
{
$body = [];
$key = $metric->getMetric() . ':' . $metric->getPeriod();
$body[$key] = $metric->getCount();
$body[$key . ':synced'] = $metric->getSynced();
$key = $metric->getMetric();
$this->pendingBulkInserts[] = [
'update' => [
......@@ -450,11 +445,22 @@ class Repository
];
$this->pendingBulkInserts[] = [
'doc' => $body,
'doc_as_upsert' => true,
'scripted_upsert' => true,
'script' => [
'source' => "
ctx._source[params.field] += params.delta;
ctx._source['@' + params.field + ':synced'] = params.synced;
",
'lang' => 'painless',
'params' => [
'field' => $key,
'delta' => $metric->getCount(),
'synced' => $metric->getSynced()
]
],
];
if (count($this->pendingBulkInserts) > 2000) { //1000 inserts
if (count($this->pendingBulkInserts) > 2000) { // ~1000 inserts
$this->bulk();
}
......@@ -464,10 +470,10 @@ class Repository
/**
* Run a bulk insert job (quicker).
*/
public function bulk()
public function bulk(): void
{
if (count($this->pendingBulkInserts) > 0) {
$res = $this->client->bulk(['body' => $this->pendingBulkInserts]);
$r = $this->client->bulk(['body' => $this->pendingBulkInserts]);
$this->pendingBulkInserts = [];
}
}
......
<?php
/**
* Sync
* @author edgebal
*/
namespace Minds\Core\Feeds\Elastic;
use Exception;
use Minds\Core\Trending\Aggregates;
class Sync
{
/** @var string */
protected $type;
/** @var string */
protected $subtype;
/** @var int */
protected $from;
/** @var int */
protected $to;
/** @var string */
protected $metric;
/** @var Repository */
protected $repository;
/**
* Sync constructor.
* @param Repository $repository
*/
public function __construct(
$repository = null
)
{
$this->repository = $repository ?: new Repository();
}
/**
* @param string $type
* @return Sync
*/
public function setType(string $type): Sync
{
$this->type = $type;
return $this;
}
/**
* @param string $subtype
* @return Sync
*/
public function setSubtype(string $subtype): Sync
{
$this->subtype = $subtype;
return $this;
}
/**
* @param int $from
* @return Sync
*/
public function setFrom(int $from): Sync
{
$this->from = $from;
return $this;
}
/**
* @param int $to
* @return Sync
*/
public function setTo(int $to): Sync
{
$this->to = $to;
return $this;
}
/**
* @param string $metric
* @return Sync
*/
public function setMetric(string $metric): Sync
{
$this->metric = $metric;
return $this;
}
/**
* @throws Exception
*/
public function run(): void
{
$type = $this->type;
if ($this->subtype) {
$type = implode(':', [$this->type, $this->subtype]);
}
switch ($this->metric) {
case 'up':
$metricMethod = 'getVotesUp';
$metricId = 'votes:up';
$sign = 1;
break;
case 'down':
$metricMethod = 'getVotesDown';
$metricId = 'votes:down';
$sign = -1;
break;
default:
throw new Exception('Invalid metric');
}
// Sync
$i = 0;
foreach ($this->{$metricMethod}() as $guid => $count) {
$countValue = $sign * $count;
$metric = new MetricsSync();
$metric
->setGuid($guid)
->setType($type)
->setMetric($metricId)
->setCount($countValue)
->setSynced(time());
try {
$this->repository->inc($metric);
} catch (Exception $e) {
error_log((string) $e);
}
echo sprintf("\n%s: %s -> %s = %s", ++$i, $guid, $metricId, $countValue);
}
// Clear any pending bulk inserts
$this->repository->bulk();
}
/**
* @return iterable
*/
protected function getVotesUp(): iterable
{
$aggregates = new Aggregates\Votes;
$aggregates->setLimit(10000);
$aggregates->setType($this->type);
$aggregates->setSubtype($this->subtype);
$aggregates->setFrom($this->from);
$aggregates->setTo($this->to);
return $aggregates->get();
}
/**
* @return iterable
*/
protected function getVotesDown(): iterable
{
$aggregates = new Aggregates\DownVotes;
$aggregates->setLimit(10000);
$aggregates->setType($this->type);
$aggregates->setSubtype($this->subtype);
$aggregates->setFrom($this->from);
$aggregates->setTo($this->to);
return $aggregates->get();
}
}
......@@ -10,6 +10,18 @@ class DownVotes extends Aggregate
{
protected $multiplier = -1;
protected $uniques = true;
/**
* @param bool $uniques
* @return DownVotes
*/
public function setUniques(bool $uniques): DownVotes
{
$this->uniques = $uniques;
return $this;
}
public function get()
{
$filter = [
......@@ -98,10 +110,12 @@ class DownVotes extends Aggregate
$result = $this->client->request($prepared);
$entities = [];
foreach ($result['aggregations']['entities']['buckets'] as $entity) {
$entities[$entity['key']] = $entity['uniques']['value'] * $this->multiplier;
$value = $this->uniques ?
($entity['uniques']['value'] ?: 1) :
$entity['doc_count'] ?: 1;
yield $entity['key'] => $value * $this->multiplier;
}
return $entities;
}
}
......@@ -8,11 +8,25 @@ use Minds\Core\Data\ElasticSearch;
class Votes extends Aggregate
{
/** @var bool */
protected $uniques = true;
protected $multiplier = 1;
private $page = -1;
private $partitions = 20;
/**
* @param bool $uniques
* @return Votes
*/
public function setUniques(bool $uniques): Votes
{
$this->uniques = $uniques;
return $this;
}
public function fetch()
{
$filter = [
......@@ -113,7 +127,11 @@ class Votes extends Aggregate
while ($this->page++ < $this->partitions - 1) {
$result = $this->fetch();
foreach ($result['aggregations']['entities']['buckets'] as $entity) {
yield $entity['key'] => ($entity['uniques']['value'] ?: 1) * $this->multiplier;
$value = $this->uniques ?
($entity['uniques']['value'] ?: 1) :
$entity['doc_count'] ?: 1;
yield $entity['key'] => $value * $this->multiplier;
}
}
}
......
Please register or to comment