Commit 1b66c216 authored by Emiliano Balbuena's avatar Emiliano Balbuena

(refactor): Fetch votes in time window, then re-sync votes from past yr

1 merge request!406WIP: Top Feed algorithm changes
Pipeline #98441412 failed with stages
in 2 minutes and 43 seconds
......@@ -106,11 +106,15 @@ class Top extends Cli\Controller implements Interfaces\CliControllerInterface
}
if (!$from || !is_numeric($from)) {
throw new CliException('Missing --from flag');
throw new CliException('Missing or invalid --from flag');
}
if (!$to || !is_numeric($to)) {
throw new CliException('Missing --to flag');
if (!$to) {
$to = time();
}
if (!is_numeric($to)) {
throw new CliException('Invalid --to flag');
}
if ($from > $to) {
......
......@@ -432,7 +432,7 @@ class Repository
}
}
public function inc(MetricsSync $metric): bool
public function add(MetricsSync $metric): bool
{
$key = $metric->getMetric();
......@@ -445,18 +445,10 @@ class Repository
];
$this->pendingBulkInserts[] = [
'scripted_upsert' => true,
'script' => [
'source' => "
ctx._source[params.field] += params.delta;
ctx._source['@' + params.field + ':synced'] = params.synced;
",
'lang' => 'painless',
'params' => [
'field' => $key,
'delta' => $metric->getCount(),
'synced' => $metric->getSynced()
]
'doc_as_upsert' => true,
'doc' => [
$key => $metric->getCount(),
"@{$key}:synced" => $metric->getSynced()
],
];
......
......@@ -35,8 +35,7 @@ class Sync
*/
public function __construct(
$repository = null
)
{
) {
$this->repository = $repository ?: new Repository();
}
......@@ -118,10 +117,18 @@ class Sync
throw new Exception('Invalid metric');
}
// Get votes in time window
$guids = array_keys(iterator_to_array($this->{$metricMethod}(false, $this->from, $this->to)));
// Sync
$from = strtotime('-1 year') * 1000;
$to = time() * 1000;
$i = 0;
foreach ($this->{$metricMethod}() as $guid => $count) {
$iterator = $this->{$metricMethod}(true, $from, $to, $guids);
foreach ($iterator as $guid => $count) {
$countValue = $sign * $count;
$metric = new MetricsSync();
......@@ -132,7 +139,7 @@ class Sync
->setCount($countValue)
->setSynced(time());
try {
$this->repository->inc($metric);
$this->repository->add($metric);
} catch (Exception $e) {
error_log((string) $e);
}
......@@ -144,33 +151,52 @@ class Sync
$this->repository->bulk();
}
/**
* @param bool $uniques
* @param int $from
* @param int $to
* @param array|null $guids
* @return iterable
*/
protected function getVotesUp(): iterable
protected function getVotesUp(bool $uniques, int $from, int $to, ?array $guids = null): iterable
{
$aggregates = new Aggregates\Votes;
$aggregates->setLimit(10000);
$aggregates->setType($this->type);
$aggregates->setSubtype($this->subtype);
$aggregates->setFrom($this->from);
$aggregates->setTo($this->to);
$aggregates
->setLimit(10000)
->setType($this->type)
->setSubtype($this->subtype)
->setFrom($from)
->setTo($to)
->setUniques($uniques);
if ($guids) {
$aggregates->setGuids($guids);
}
return $aggregates->get();
}
/**
* @param bool $uniques
* @param int $from
* @param int $to
* @param array|null $guids
* @return iterable
*/
protected function getVotesDown(): iterable
protected function getVotesDown(bool $uniques, int $from, int $to, ?array $guids = null): iterable
{
$aggregates = new Aggregates\DownVotes;
$aggregates->setLimit(10000);
$aggregates->setType($this->type);
$aggregates->setSubtype($this->subtype);
$aggregates->setFrom($this->from);
$aggregates->setTo($this->to);
$aggregates
->setLimit(10000)
->setType($this->type)
->setSubtype($this->subtype)
->setFrom($from)
->setTo($to)
->setUniques($uniques);
if ($guids) {
$aggregates->setGuids($guids);
}
return $aggregates->get();
}
......
......@@ -5,6 +5,7 @@
namespace Minds\Core\Trending\Aggregates;
use Minds\Core\Data\ElasticSearch;
use Minds\Helpers\Text;
class DownVotes extends Aggregate
{
......@@ -12,6 +13,9 @@ class DownVotes extends Aggregate
protected $uniques = true;
/** @var string[] */
protected $guids;
/**
* @param bool $uniques
* @return DownVotes
......@@ -22,6 +26,16 @@ class DownVotes extends Aggregate
return $this;
}
/**
* @param string[] $guids
* @return DownVotes
*/
public function setGuids(array $guids): DownVotes
{
$this->guids = $guids;
return $this;
}
public function get()
{
$filter = [
......@@ -75,6 +89,12 @@ class DownVotes extends Aggregate
// 'rating' => $this->rating
//];
if ($this->guids) {
$must[]['terms'] = [
'entity_guid' => Text::buildArray($this->guids),
];
}
$query = [
'index' => 'minds-metrics-*',
'type' => 'action',
......@@ -92,19 +112,23 @@ class DownVotes extends Aggregate
'field' => "$field.keyword",
'size' => $this->limit,
// 'order' => [ 'uniques' => 'DESC' ],
],
'aggs' => [
'uniques' => [
'cardinality' => [
'field' => "$cardinality_field.keyword"
]
]
]
]
]
]
];
if ($this->uniques) {
$query['body']['aggs']['entities']['aggs'] = [
'uniques' => [
'cardinality' => [
'field' => "$cardinality_field.keyword",
//'precision_threshold' => 40000
]
]
];
}
$prepared = new ElasticSearch\Prepared\Search();
$prepared->query($query);
......
......@@ -5,12 +5,16 @@
namespace Minds\Core\Trending\Aggregates;
use Minds\Core\Data\ElasticSearch;
use Minds\Helpers\Text;
class Votes extends Aggregate
{
/** @var bool */
protected $uniques = true;
/** @var string[] */
protected $guids;
protected $multiplier = 1;
private $page = -1;
......@@ -27,6 +31,16 @@ class Votes extends Aggregate
return $this;
}
/**
* @param string[] $guids
* @return Votes
*/
public function setGuids(array $guids): Votes
{
$this->guids = $guids;
return $this;
}
public function fetch()
{
$filter = [
......@@ -77,6 +91,12 @@ class Votes extends Aggregate
$field = 'entity_owner_guid';
}
if ($this->guids) {
$must[]['terms'] = [
'entity_guid' => Text::buildArray($this->guids),
];
}
//$must[]['match'] = [
// 'rating' => $this->rating
//];
......@@ -102,20 +122,23 @@ class Votes extends Aggregate
'num_partitions' => $this->partitions,
],
// 'order' => [ 'uniques' => 'DESC' ],
],
'aggs' => [
'uniques' => [
'cardinality' => [
'field' => "$cardinality_field.keyword",
//'precision_threshold' => 40000
]
]
]
]
]
]
];
if ($this->uniques) {
$query['body']['aggs']['entities']['aggs'] = [
'uniques' => [
'cardinality' => [
'field' => "$cardinality_field.keyword",
//'precision_threshold' => 40000
]
]
];
}
$prepared = new ElasticSearch\Prepared\Search();
$prepared->query($query);
......
Please register or to comment