...
 
Commits (2)
......@@ -10,6 +10,8 @@ use Exception;
class Dispatcher
{
const SAVE_DELTA = 10;
/** @var Manager */
protected $manager;
......@@ -43,13 +45,15 @@ class Dispatcher
$campaign = $this->manager->get($campaignRef->getUrn());
if ($campaign->isDelivering()) {
$impressionsMet = $campaign->getImpressionsMet();
$campaign = $this->metrics
->setCampaign($campaign)
->syncImpressionsMet();
// TODO: Save campaign every 10/20 impressions to avoid ES overloading
error_log("[BoostCampaignsDispatcher] Saving updated {$campaign->getUrn()}...");
$this->manager->sync($campaign);
if ($campaign->getImpressionsMet() - $impressionsMet >= static::SAVE_DELTA) {
error_log("[BoostCampaignsDispatcher] Saving updated {$campaign->getUrn()}...");
$this->manager->sync($campaign);
}
}
if ($campaign->shouldBeCompleted($now)) {
......
......@@ -58,7 +58,7 @@ class ElasticRepository
$must_not = [];
$sort = [
'@timestamp' => $opts['sort']
'@timestamp' => $opts['sort'],
];
//
......@@ -88,8 +88,8 @@ class ElasticRepository
if ($opts['entity_urn']) {
$must[] = [
'term' => [
'entity_urn' => $opts['entity_urn']
]
'entity_urn' => $opts['entity_urn'],
],
];
}
......@@ -232,6 +232,7 @@ class ElasticRepository
'sort' => 'asc',
], $opts);
$filter = [];
$must = [];
if ($opts['offset']) {
......@@ -246,15 +247,24 @@ class ElasticRepository
];
}
if ($opts['type']) {
$filter[] = [
'term' => [
'type' => $opts['type'],
],
];
}
$sort = ['@timestamp' => $opts['sort'] ?? 'asc'];
$body = [
'query' => [
'bool' => [
'filter' => $filter,
'must' => $must,
],
],
'sort' => $sort
'sort' => $sort,
];
$prepared = new Search();
......@@ -268,6 +278,9 @@ class ElasticRepository
$result = $this->es->request($prepared);
var_dump($result); die();
$data = [];
foreach ($result['hits']['hits'] as $doc) {
......