Commit 6335dc83 authored by Mark Harding's avatar Mark Harding

(feat): sync cassandra views to elastic

parent 6d00dc53
No related merge requests found
Pipeline #66009345 (#755) passed with stages
in 7 minutes and 31 seconds
......@@ -85,7 +85,6 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf
}
public function sync_graphs()
{
error_reporting(E_ALL);
ini_set('display_errors', 1);
......@@ -129,5 +128,31 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf
}
$this->out('Completed caching site metrics');
}
public function syncViews()
{
error_reporting(E_ALL);
ini_set('display_errors', 1);
$from = $this->getOpt('from') ?: strtotime('-7 days');
$opts = [
'from' => $from,
'day' => (int) date('d', $from),
];
$manager = new Core\Analytics\Views\Manager();
$i = 0;
$start = time();
foreach ($manager->syncToElastic($opts) as $view) {
$time = (new \Cassandra\Timeuuid($view->getUuid()))->time();
$date = date('d-m-Y h:i', $time);
$rps = (++$i) / ((time() - $start) ?: 1);
$this->out($i . "-{$view->getUuid()} {$date} ($rps/sec)");
}
$this->out('Done');
}
}
<?php
/**
* ElasticRepository
* @author Mark
*/
namespace Minds\Core\Analytics\Views;
use DateTime;
use DateTimeZone;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Core\Data\ElasticSearch\Client as ElasticClient;
use Minds\Core\Di\Di;
class ElasticRepository
{
/** @var ElasticClient */
protected $es;
/** @var array $pendingBulkInserts * */
private $pendingBulkInserts = [];
/**
* Repository constructor.
* @param ElasticClient $es
*/
public function __construct(
$es = null
)
{
$this->es = $es ?: Di::_()->get('Database\ElasticSearch');
}
/**
* @param array $opts
* @return Response
*/
public function getList(array $opts = [])
{
$response = new Response();
return $response;
}
/**
* @param View $view
* @return bool
* @throws Exception
*/
public function add(View $view)
{
$index = 'minds-views-' . date('m-Y', $view->getTimestamp());
$body = [
'uuid' => $view->getUuid(),
'@timestamp' => $view->getTimestamp() * 1000,
'entity_urn' => $view->getEntityUrn(),
'page_token' => $view->getPageToken(),
'campaign' => $view->getCampaign(),
'delta' => (int) $view->getDelta(),
'medium' => $view->getMedium(),
'platform' => $view->getPlatform(),
'position' => (int) $view->getPosition(),
'source' => $view->getSource(),
];
$body = array_filter($body, function($val) {
if ($val === '' || $val === null) {
return false;
}
return true;
});
$this->pendingBulkInserts[] = [
'update' => [
'_id' => (string) $view->getUuid(),
'_index' => $index,
'_type' => '_doc',
],
];
$this->pendingBulkInserts[] = [
'doc' => $body,
'doc_as_upsert' => true,
];
if (count($this->pendingBulkInserts) > 2000) { //1000 inserts
$this->bulk();
}
}
/**
* Bulk insert results
*/
public function bulk()
{
if (count($this->pendingBulkInserts) > 0) {
$res = $this->es->bulk(['body' => $this->pendingBulkInserts]);
$this->pendingBulkInserts = [];
}
}
}
......@@ -13,11 +13,16 @@ class Manager
/** @var Repository */
protected $repository;
/** @var ElasticRepository */
protected $elasticRepository;
public function __construct(
$repository = null
$repository = null,
$elasticRepository = null
)
{
$this->repository = $repository ?: new Repository();
$this->elasticRepository = $elasticRepository ?: new ElasticRepository();
}
/**
......@@ -40,4 +45,40 @@ class Manager
return true;
}
/**
* Synchronise views from cassandra to elastic
* @param int $from
* @param int $to
* @return void
*/
public function syncToElastic($opts = [])
{
$opts = array_merge([
'from' => null,
'to' => $to,
'day' => 5,
'month' => 6,
'year' => 2019,
'limit' => 1000,
'offset' => '',
], $opts);
while (true) {
$result = $this->repository->getList($opts);
$opts['offset'] = $result->getPagingToken();
foreach ($result as $view) {
$this->elasticRepository->add($view);
yield $view;
}
if ($result->isLastPage()) {
break;
}
}
$this->elasticRepository->bulk(); // Save the final batch
}
}
......@@ -42,14 +42,43 @@ class Repository
$opts = array_merge([
'limit' => 500,
'offset' => '',
'year' => null,
'month' => null,
'day' => null,
'from' => null,
], $opts) ;
$cql = "SELECT * FROM views";
$values = [];
$cqlOpts = [];
$where = [];
// TODO: Implement constraints (by year/month/day/timeuuid)
if ($opts['year']) {
$where[] = 'year = ?';
$values[] = (int) $opts['year'];
}
if ($opts['month']) {
$where[] = 'month = ?';
$values[] = new Tinyint($opts['month']);
}
if ($opts['day']) {
$where[] = 'day = ?';
$values[] = new Tinyint($opts['day']);
}
if ($opts['from']) {
$where[] = 'uuid > ?';
$values[] = new Timeuuid($opts['from'] * 1000);
}
if (count($where)) {
$cql .= " WHERE " . implode(' AND ', $where);
}
if ($opts['limit']) {
$cqlOpts['page_size'] = (int) $opts['limit'];
}
......@@ -60,7 +89,7 @@ class Repository
$prepared = new Custom();
$prepared->query($cql, $values);
$prepared->setOpts($opts);
$prepared->setOpts($cqlOpts);
$response = new Response();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment