...
 
Commits (3)
<?php
/**
* Dispatcher
* @author edgebal
*/
namespace Minds\Core\Boost\Campaigns;
......@@ -21,6 +17,12 @@ class Dispatcher
/** @var int */
protected $impressionsSyncThreshold;
/** @var Campaign */
protected $campaign;
/** @var int */
protected $now;
/**
* Dispatcher constructor.
* @param Manager $manager
......@@ -39,37 +41,58 @@ class Dispatcher
}
/**
* @param Campaign $campaignRef
* @return bool
* @throws CampaignException
* @param string $campaignUrn
* @throws Exception
*/
public function onLifecycle(Campaign $campaignRef)
public function onLifecycle(string $campaignUrn)
{
$now = time() * 1000;
$this->now = time() * 1000;
$this->campaign = $this->manager->get($campaignUrn);
$this->metrics->setCampaign($this->campaign);
$campaign = $this->manager->get($campaignRef->getUrn());
$this->syncIfImpressionsThresholdMet();
$this->completeCampaign();
$this->startCampaign();
}
if ($campaign->isDelivering()) {
$impressionsMet = $campaign->getImpressionsMet();
$campaign = $this->metrics
->setCampaign($campaign)
->syncImpressionsMet();
/**
* Sync to database if impressions threshold is met
* @throws Exception
*/
public function syncIfImpressionsThresholdMet(): void
{
if ($this->campaign->isDelivering()) {
$impressionsMet = $this->campaign->getImpressionsMet();
$this->campaign = $this->metrics->syncImpressionsMet();
if ($campaign->getImpressionsMet() - $impressionsMet >= $this->impressionsSyncThreshold) {
error_log("[BoostCampaignsDispatcher] Saving updated {$campaign->getUrn()}...");
$this->manager->sync($campaign);
if ($this->campaign->getImpressionsMet() - $impressionsMet >= $this->impressionsSyncThreshold) {
error_log("[BoostCampaignsDispatcher] Saving updated {$this->campaign->getUrn()}...");
$this->manager->sync($this->campaign);
}
}
}
if ($campaign->shouldBeCompleted($now)) {
error_log("[BoostCampaignsDispatcher] Completing {$campaign->getUrn()}...");
$this->manager->complete($campaign);
} elseif ($campaign->shouldBeStarted($now)) {
error_log("[BoostCampaignsDispatcher] Starting {$campaign->getUrn()}...");
$this->manager->start($campaign);
/**
* Record the campaign as completed
* @throws CampaignException
*/
public function completeCampaign(): void
{
if ($this->campaign->shouldBeCompleted($this->now)) {
error_log("[BoostCampaignsDispatcher] Completing {$this->campaign->getUrn()}...");
$this->manager->complete($this->campaign);
}
}
return true;
/**
* Record the campaign as started
* @throws CampaignException
*/
public function startCampaign(): void
{
if ($this->campaign->shouldBeStarted($this->now)) {
error_log("[BoostCampaignsDispatcher] Starting {$this->campaign->getUrn()}...");
$this->manager->start($this->campaign);
}
}
}
<?php
/**
* ElasticRepository
* @author edgebal
*/
namespace Minds\Core\Boost\Campaigns;
......@@ -22,14 +18,21 @@ class ElasticRepository
/** @var ElasticSearchClient */
protected $es;
/** @var ElasticRepositoryQueryBuilder */
protected $queryBuilder;
/**
* ElasticRepository constructor.
* @param ElasticSearchClient $es
* Options for fetching queries
* @var array
*/
protected $opts;
public function __construct(
$es = null
?ElasticSearchClient $es = null,
?ElasticRepositoryQueryBuilder $queryBuilder = null
) {
$this->es = $es ?: Di::_()->get('Database\ElasticSearch');
$this->queryBuilder = $queryBuilder ?: new ElasticRepositoryQueryBuilder();
}
/**
......@@ -38,7 +41,7 @@ class ElasticRepository
*/
public function getList(array $opts = [])
{
$opts = array_merge([
$this->opts = array_merge([
'limit' => 12,
'from' => 0,
'offset' => null,
......@@ -52,132 +55,16 @@ class ElasticRepository
'quality' => null,
], $opts);
$must = [];
$must_not = [];
$sort = [
'@timestamp' => $opts['sort'],
];
//
if ($opts['type']) {
$must[] = [
'term' => [
'type' => $opts['type'],
],
];
}
if ($opts['guid']) {
$must[] = [
'term' => [
'_id' => (string) $opts['guid'],
],
];
} elseif ($opts['owner_guid']) {
$must[] = [
'term' => [
'owner_guid' => (string) $opts['owner_guid'],
],
];
}
if ($opts['entity_urn']) {
$must[] = [
'term' => [
'entity_urn' => $opts['entity_urn'],
],
];
}
if ($opts['state'] === 'approved') {
$must[] = [
'exists' => [
'field' => '@reviewed',
],
];
} elseif ($opts['state'] === 'in_review') {
$must_not[] = [
'exists' => [
'field' => '@reviewed',
],
];
}
if ($opts['state'] === 'approved' || $opts['state'] === 'in_review') {
$must_not[] = [
'exists' => [
'field' => '@completed',
],
];
$must_not[] = [
'exists' => [
'field' => '@rejected',
],
];
$must_not[] = [
'exists' => [
'field' => '@revoked',
],
];
}
if ($opts['rating']) {
$must[] = [
'range' => [
'rating' => [
'lte' => $opts['rating'],
],
],
];
}
if ($opts['quality']) {
$must[] = [
'range' => [
'rating' => [
'gte' => $opts['rating'],
],
],
];
}
if ($opts['offset']) {
$rangeKey = $opts['sort'] === 'asc' ? 'gt' : 'lt';
$must[] = [
'range' => [
'@timestamp' => [
$rangeKey => $opts['offset'],
],
],
];
}
//
$body = [
'query' => [
'bool' => [
'must' => $must,
'must_not' => $must_not,
],
],
'sort' => $sort,
];
$this->queryBuilder->setOpts($opts);
$prepared = new Search();
$prepared->query([
'index' => 'minds-boost-campaigns',
'type' => '_doc',
'body' => $body,
'size' => $opts['limit'],
'from' => (int) ($opts['from'] ?? 0),
'body' => $this->queryBuilder->query(),
'size' => $this->opts['limit'],
'from' => (int)($this->opts['from'] ?? 0),
]);
$result = $this->es->request($prepared);
......@@ -205,11 +92,11 @@ class ElasticRepository
->setImpressionsMet($doc['_source']['impressions_met'])
->setRating($doc['_source']['rating'])
->setQuality($doc['_source']['quality'])
->setCreatedTimestamp(((int) $doc['_source']['@timestamp']) ?: null)
->setReviewedTimestamp(((int) $doc['_source']['@reviewed']) ?: null)
->setRejectedTimestamp(((int) $doc['_source']['@rejected']) ?: null)
->setRevokedTimestamp(((int) $doc['_source']['@revoked']) ?: null)
->setCompletedTimestamp(((int) $doc['_source']['@completed']) ?: null);
->setCreatedTimestamp(($doc['_source']['@timestamp']) ?? null)
->setReviewedTimestamp(($doc['_source']['@reviewed']) ?? null)
->setRejectedTimestamp(($doc['_source']['@rejected']) ?? null)
->setRevokedTimestamp(($doc['_source']['@revoked']) ?? null)
->setCompletedTimestamp(($doc['_source']['@completed']) ?? null);
$response[] = $campaign;
$offset = $campaign->getCreatedTimestamp();
......@@ -222,7 +109,7 @@ class ElasticRepository
public function fetch(array $opts = [])
{
$opts = array_merge([
$this->opts = array_merge([
'limit' => 24,
'from' => 0,
'rating' => null,
......@@ -231,48 +118,15 @@ class ElasticRepository
'sort' => 'asc',
], $opts);
$filter = [];
$must = [];
if ($opts['offset']) {
$rangeKey = $opts['sort'] === 'asc' ? 'gt' : 'lt';
$must[] = [
'range' => [
'@timestamp' => [
$rangeKey => $opts['offset'],
],
],
];
}
if ($opts['type']) {
$filter[] = [
'term' => [
'type' => $opts['type'],
],
];
}
$sort = ['@timestamp' => $opts['sort'] ?? 'asc'];
$body = [
'query' => [
'bool' => [
'filter' => $filter,
'must' => $must,
],
],
'sort' => $sort,
];
$this->queryBuilder->setOpts($this->opts);
$prepared = new Search();
$prepared->query([
'index' => 'minds-boost,minds-boost-campaigns',
'type' => '_doc',
'body' => $body,
'from' => $opts['from'] ?? 0,
'size' => $opts['limit'],
'body' => $this->queryBuilder->query(),
'from' => $this->opts['from'] ?? 0,
'size' => $this->opts['limit'],
]);
$result = $this->es->request($prepared);
......@@ -304,7 +158,7 @@ class ElasticRepository
$data[] = $entity;
}
$data = array_slice($data, 0, $opts['limit']);
$data = array_slice($data, 0, $this->opts['limit']);
$response = new Response($data, count($data));
......
<?php
namespace Minds\Core\Boost\Campaigns;
class ElasticRepositoryQueryBuilder
{
protected $opts;
protected $must;
protected $mustNot;
protected $sort;
public function setOpts(array $opts): self
{
$this->opts = $opts;
return $this;
}
public function reset()
{
$this->opts = [];
$this->must = [];
$this->mustNot = [];
$this->sort = [];
}
public function query(): array
{
$this->reset();
$this->parseType();
$this->parseGuid();
$this->parseEntityUrn();
$this->parseState();
$this->parseRating();
$this->parseQuality();
$this->parseOffset();
$this->parseSort();
return $this->body();
}
public function parseType(): void
{
if ($this->opts['type']) {
$this->must[] = [
'term' => [
'type' => $this->opts['type'],
],
];
}
}
public function parseGuid(): void
{
if ($this->opts['guid']) {
$this->must[] = [
'term' => [
'_id' => (string)$this->opts['guid'],
],
];
} elseif ($this->opts['owner_guid']) {
$this->must[] = [
'term' => [
'owner_guid' => (string)$this->opts['owner_guid'],
],
];
}
}
public function parseEntityUrn(): void
{
if ($this->opts['entity_urn']) {
$this->must[] = [
'term' => [
'entity_urn' => $this->opts['entity_urn'],
],
];
}
}
public function parseState(): void
{
if ($this->opts['state'] === 'approved') {
$this->must[] = [
'exists' => [
'field' => '@reviewed',
],
];
} elseif ($this->opts['state'] === 'in_review') {
$this->mustNot[] = [
'exists' => [
'field' => '@reviewed',
],
];
}
if ($this->opts['state'] === 'approved' || $this->opts['state'] === 'in_review') {
$this->mustNot[] = [
'exists' => [
'field' => '@completed',
],
];
$this->mustNot[] = [
'exists' => [
'field' => '@rejected',
],
];
$this->mustNot[] = [
'exists' => [
'field' => '@revoked',
],
];
}
}
public function parseRating(): void
{
if ($this->opts['rating']) {
$this->must[] = [
'range' => [
'rating' => [
'lte' => $this->opts['rating'],
],
],
];
}
}
public function parseQuality(): void
{
if ($this->opts['quality']) {
$this->must[] = [
'range' => [
'quality' => [
'gte' => $this->opts['quality'],
],
],
];
}
}
public function parseOffset(): void
{
if ($this->opts['offset']) {
$rangeKey = $this->opts['sort'] === 'asc' ? 'gt' : 'lt';
$this->must[] = [
'range' => [
'@timestamp' => [
$rangeKey => $this->opts['offset'],
],
],
];
}
}
public function parseSort(): void
{
$this->sort = [
'@timestamp' => $this->opts['sort'] ?? 'asc',
];
}
public function body(): array
{
return [
'query' => [
'bool' => [
'must' => $this->must,
'must_not' => $this->mustNot,
],
],
'sort' => $this->sort,
];
}
}
......@@ -145,19 +145,18 @@ class Manager
}
/**
* @param $urn
* @param string $urn
* @param array $opts
* @return Campaign|null
* @throws Exception
*/
public function get($urn, array $opts = [])
public function get(string $urn, array $opts = [])
{
$opts = array_merge([
'useElastic' => false
], $opts);
$urn = new Urn($urn);
$guid = $urn->getNss();
$guid = (new Urn($urn))->getNss();
if (!$guid) {
return null;
......
......@@ -21,11 +21,11 @@ class BoostCampaignDispatcher implements Interfaces\QueueRunner
/** @var Campaign $campaignRef */
$campaignRef = is_string($data['campaign']) ? unserialize($data['campaign']) : $data['campaign'];
echo "Checking boost campaign {$campaignRef->getUrn()} status...\n";
error_log("Checking boost campaign {$campaignRef->getUrn()} status...");
/** @var Dispatcher $dispatcher */
$dispatcher = Di::_()->get('Boost\Campaigns\Dispatcher');
$dispatcher->onLifecycle($campaignRef);
$dispatcher->onLifecycle($campaignRef->getUrn());
});
}
}
<?php
namespace Spec\Minds\Core\Boost\Campaigns;
use Minds\Core\Boost\Campaigns\CampaignException;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
class CampaignExceptionSpec extends ObjectBehavior
{
public function it_is_initializable()
{
$this->shouldHaveType(CampaignException::class);
}
}
......@@ -2,7 +2,6 @@
namespace Spec\Minds\Core\Boost\Campaigns;
use Exception;
use Minds\Core\Boost\Campaigns\Campaign;
use Minds\Core\Boost\Campaigns\Dispatcher;
use Minds\Core\Boost\Campaigns\Manager;
......@@ -33,14 +32,9 @@ class DispatcherSpec extends ObjectBehavior
}
public function it_should_not_sync_campaign_on_lifecycle_if_below_threshold(
Campaign $campaignRef,
Campaign $initialCampaign,
Campaign $campaign
) {
$campaignRef->getUrn()
->shouldBeCalled()
->willReturn('urn:campaign:1000');
$campaign->getUrn()
->willReturn('urn:campaign:1000');
......@@ -81,20 +75,13 @@ class DispatcherSpec extends ObjectBehavior
->shouldBeCalled()
->willReturn(false);
$this
->onLifecycle($campaignRef)
->shouldReturn(true);
$this->onLifecycle('urn:campaign:1000');
}
public function it_should_sync_campaign_on_lifecycle_if_above_threshold(
Campaign $campaignRef,
Campaign $initialCampaign,
Campaign $campaign
) {
$campaignRef->getUrn()
->shouldBeCalled()
->willReturn('urn:campaign:1000');
$campaign->getUrn()
->willReturn('urn:campaign:1000');
......@@ -136,19 +123,12 @@ class DispatcherSpec extends ObjectBehavior
->shouldBeCalled()
->willReturn(false);
$this
->onLifecycle($campaignRef)
->shouldReturn(true);
$this->onLifecycle('urn:campaign:1000');
}
public function it_should_complete_campaign(
Campaign $campaignRef,
Campaign $campaign
) {
$campaignRef->getUrn()
->shouldBeCalled()
->willReturn('urn:campaign:1000');
$campaign->getUrn()
->willReturn('urn:campaign:1000');
......@@ -171,19 +151,12 @@ class DispatcherSpec extends ObjectBehavior
$campaign->shouldBeStarted(Argument::any())
->willReturn(false);
$this
->onLifecycle($campaignRef)
->shouldReturn(true);
$this->onLifecycle('urn:campaign:1000');
}
public function it_should_start_campaign(
Campaign $campaignRef,
Campaign $campaign
) {
$campaignRef->getUrn()
->shouldBeCalled()
->willReturn('urn:campaign:1000');
$campaign->getUrn()
->willReturn('urn:campaign:1000');
......@@ -207,8 +180,6 @@ class DispatcherSpec extends ObjectBehavior
->shouldBeCalled()
->willReturn(true);
$this
->onLifecycle($campaignRef)
->shouldReturn(true);
$this->onLifecycle('urn:campaign:1000');
}
}
......@@ -2,14 +2,167 @@
namespace Spec\Minds\Core\Boost\Campaigns;
use Minds\Core\Boost\Campaigns\Campaign;
use Minds\Core\Boost\Campaigns\ElasticRepository;
use Minds\Core\Boost\Campaigns\ElasticRepositoryQueryBuilder;
use Minds\Core\Boost\Network\Boost;
use Minds\Core\Data\ElasticSearch\Client;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
use Minds\Core\Data\ElasticSearch\Prepared;
class ElasticRepositorySpec extends ObjectBehavior
{
/** @var Client */
protected $client;
/** @var ElasticRepositoryQueryBuilder */
protected $queryBuilder;
public function let(Client $client, ElasticRepositoryQueryBuilder $queryBuilder)
{
$this->beConstructedWith($client, $queryBuilder);
$this->client = $client;
$this->queryBuilder = $queryBuilder;
}
public function it_is_initializable()
{
$this->shouldHaveType(ElasticRepository::class);
}
public function it_should_return_a_list_of_campaigns_matching_criteria()
{
$esResult = [
'hits' => [
'hits' => [
[
'_id' => 1234,
'_source' => [
'type' => 'newsfeed',
'owner_guid' => 1234,
'name' => 'test',
'entity_urns' => null,
'hashtags' => null,
'nsfw' => null,
'start' => 123456,
'end' => 234567,
'budget' => 1,
'budget_type' => 'tokens',
'checksum' => '0xdeadbeef',
'impressions' => null,
'impressions_met' => null,
'rating' => 0,
'quality' => 0,
'@created' => 12345
]
]
]
]
];
$this->queryBuilder->setOpts(Argument::type('array'))->shouldBeCalled();
$this->queryBuilder->query()->shouldBeCalled();
$this->client->request(Argument::type(Prepared\Search::class))->shouldBeCalled()->willReturn($esResult);
$return = $this->getList(['guid' => 1234]);
$return[0]->shouldBeAnInstanceOf(Campaign::class);
}
public function it_should_return_a_list_of_boosts_and_campaigns_matching_criteria()
{
$esResult = [
'hits' => [
'hits' => [
[
'_index' => 'minds-boost-campaigns',
'_id' => 1234,
'_source' => [
'type' => 'newsfeed',
'owner_guid' => 1234,
'@timestamp' => 12345
]
],
[
'_index' => 'minds-boost',
'_id' => 1234,
'_source' => [
'type' => 'newsfeed',
'owner_guid' => 1234,
'@timestamp' => 12345
]
]
]
]
];
$this->queryBuilder->setOpts(Argument::type('array'))->shouldBeCalled();
$this->queryBuilder->query()->shouldBeCalled();
$this->client->request(Argument::type(Prepared\Search::class))->shouldBeCalled()->willReturn($esResult);
$return = $this->fetch(['quality' => 5]);
$return[0]->shouldBeAnInstanceOf(Campaign::class);
$return[1]->shouldBeAnInstanceOf(Boost::class);
}
public function it_should_add_an_entry_to_es(Campaign $campaign)
{
$campaign->getType()->willReturn('newsfeed');
$campaign->getOwnerGuid()->willReturn('1234');
$campaign->getName()->willReturn('test');
$campaign->getEntityUrns()->willReturn(null);
$campaign->getHashtags()->willReturn(null);
$campaign->getNsfw()->willReturn(null);
$campaign->getStart()->willReturn('12345');
$campaign->getEnd()->willReturn('23456');
$campaign->getBudget()->willReturn('1');
$campaign->getBudgetType()->willReturn('tokens');
$campaign->getChecksum()->willReturn('0xdeadbeef');
$campaign->getImpressions()->willReturn(null);
$campaign->getCreatedTimestamp()->willReturn('123456');
$campaign->getImpressionsMet()->willReturn('2');
$campaign->getRating()->willReturn('5');
$campaign->getQuality()->willReturn('5');
$campaign->getReviewedTimestamp()->willReturn(null);
$campaign->getRejectedTimestamp()->willReturn(null);
$campaign->getRevokedTimestamp()->willReturn(null);
$campaign->getCompletedTimestamp()->willReturn(null);
$campaign->getGuid()->willReturn('12345');
$this->client->request(Argument::type(Prepared\Update::class))->shouldBeCalled()->willReturn(true);
$this->add($campaign);
}
public function it_should_update_an_entry_to_es(Campaign $campaign)
{
$campaign->getType()->willReturn('newsfeed');
$campaign->getOwnerGuid()->willReturn('1234');
$campaign->getName()->willReturn('test');
$campaign->getEntityUrns()->willReturn(null);
$campaign->getHashtags()->willReturn(null);
$campaign->getNsfw()->willReturn(null);
$campaign->getStart()->willReturn('12345');
$campaign->getEnd()->willReturn('23456');
$campaign->getBudget()->willReturn('1');
$campaign->getBudgetType()->willReturn('tokens');
$campaign->getChecksum()->willReturn('0xdeadbeef');
$campaign->getImpressions()->willReturn(null);
$campaign->getCreatedTimestamp()->willReturn('123456');
$campaign->getImpressionsMet()->willReturn('2');
$campaign->getRating()->willReturn('5');
$campaign->getQuality()->willReturn('5');
$campaign->getReviewedTimestamp()->willReturn(null);
$campaign->getRejectedTimestamp()->willReturn(null);
$campaign->getRevokedTimestamp()->willReturn(null);
$campaign->getCompletedTimestamp()->willReturn(null);
$campaign->getGuid()->willReturn('12345');
$this->client->request(Argument::type(Prepared\Update::class))->shouldBeCalled()->willReturn(true);
$this->update($campaign);
}
public function it_should_delete_entry_in_es()
{
$this->shouldThrow('NotImplementedException')->duringDelete(new Campaign());
}
}