Commit d587d4cf authored by Emiliano Balbuena's avatar Emiliano Balbuena

(wip)(refactor): Campaigns should use their own ES index and cql table

1 merge request!235WIP: Boost Campaigns (&24)
Pipeline #68888799 failed with stages
in 3 minutes and 2 seconds
......@@ -92,7 +92,8 @@ class campaigns implements Interfaces\Api
if (!$isEditing) {
$campaign
->setType($_POST['type'] ?? '')
->setEntityUrns($_POST['entity_urns'] ?? []);
->setEntityUrns($_POST['entity_urns'] ?? [])
->setChecksum($_POST['checksum'] ?? '');
} else {
$campaign
->setUrn($urn);
......
......@@ -6,7 +6,9 @@
namespace Minds\Core\Boost\Campaigns;
use Exception;
use JsonSerializable;
use Minds\Common\Urn;
use Minds\Entities\User;
use Minds\Traits\MagicAttributes;
......@@ -25,12 +27,18 @@ use Minds\Traits\MagicAttributes;
* @method Campaign setEntityUrns(string[] $entityUrns)
* @method string[] getHashtags()
* @method Campaign setHashtags(string[] $hashtags)
* @method int[] getNsfw()
* @method Campaign setNsfw(int[] $hashtags)
* @method int getStart()
* @method Campaign setStart(int $start)
* @method int getEnd()
* @method Campaign setEnd(int $end)
* @method string getBudget()
* @method Campaign setBudget(string $budget)
* @method string getBudgetType()
* @method Campaign setBudgetType(string $budgetType)
* @method string getChecksum()
* @method Campaign setChecksum(string $checksum)
* @method int getImpressions()
* @method Campaign setImpressions(int $impressions)
* @method int getImpressionsMet()
......@@ -51,41 +59,44 @@ class Campaign implements JsonSerializable
use MagicAttributes;
/** @var string */
const COMPLETED_STATUS = 'completed';
const PENDING_STATUS = 'pending';
/** @var string */
const REJECTED_STATUS = 'rejected';
const CREATED_STATUS = 'created';
/** @var string */
const REVOKED_STATUS = 'revoked';
const APPROVED_STATUS = 'approved';
/** @var string */
const APPROVED_STATUS = 'approved';
const REJECTED_STATUS = 'rejected';
/** @var string */
const CREATED_STATUS = 'created';
const REVOKED_STATUS = 'revoked';
/** @var string */
const PENDING_STATUS = 'pending';
const COMPLETED_STATUS = 'completed';
/** @var string */
protected $urn;
/** @var string */
protected $type;
/** @var int|string */
protected $ownerGuid;
/** @var string */
protected $name;
/** @var string */
protected $type;
/** @var string[] */
protected $entityUrns = [];
/** @var string[] */
protected $hashtags;
/** @var int[] */
protected $nsfw;
/** @var int */
protected $start;
......@@ -95,6 +106,12 @@ class Campaign implements JsonSerializable
/** @var string */
protected $budget;
/** @var string */
protected $budgetType;
/** @var string */
protected $checksum;
/** @var int */
protected $impressions;
......@@ -116,6 +133,22 @@ class Campaign implements JsonSerializable
/** @var int */
protected $completedTimestamp;
/**
* @return int|string
*/
public function getGuid()
{
if (!$this->urn) {
return '';
}
try {
return (new Urn($this->urn))->getNss();
} catch (Exception $exception) {
return '';
}
}
/**
* @param User $owner
* @return Campaign
......@@ -164,23 +197,26 @@ class Campaign implements JsonSerializable
public function export()
{
return [
'name' => $this->name,
'type' => $this->type,
'urn' => $this->urn,
'name' => $this->name,
'entity_urns' => $this->entityUrns,
'hashtags' => $this->hashtags,
'nsfw' => $this->nsfw,
'start' => $this->start,
'end' => $this->end,
'budget' => $this->budget,
'urn' => $this->urn,
'delivery_status' => $this->getDeliveryStatus(),
'budget_type' => $this->budgetType,
'checksum' => $this->checksum,
'impressions' => $this->impressions,
'impressions_met' => $this->impressionsMet,
'cpm' => $this->cpm(),
'created_timestamp' => $this->createdTimestamp,
'reviewed_timestamp' => $this->reviewedTimestamp,
'revoked_timestamp' => $this->revokedTimestamp,
'rejected_timestamp' => $this->rejectedTimestamp,
'completed_timestamp' => $this->completedTimestamp,
'delivery_status' => $this->getDeliveryStatus(),
'cpm' => $this->cpm(),
];
}
......
......@@ -40,6 +40,8 @@ class BudgetDelegate
throw new CampaignException('Campaign should have a budget');
}
$campaign->setBudgetType('tokens');
$campaign = $this->updateImpressionsByCpm($campaign);
// TODO: Validate offchain balance, or set as pending for onchain
......
<?php
/**
* ElasticRepository
* @author edgebal
*/
namespace Minds\Core\Boost\Campaigns;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Core\Data\ElasticSearch\Client as ElasticSearchClient;
use Minds\Core\Data\ElasticSearch\Prepared\Search;
use Minds\Core\Data\ElasticSearch\Prepared\Update;
use Minds\Core\Di\Di;
use Minds\Helpers\Number;
use Minds\Helpers\Text;
use NotImplementedException;
class ElasticRepository
{
/** @var ElasticSearchClient */
protected $es;
/**
* ElasticRepository constructor.
* @param ElasticSearchClient $es
*/
public function __construct(
$es = null
)
{
$this->es = $es ?: Di::_()->get('Database\ElasticSearch');
}
/**
* @param array $opts
* @return Response
*/
public function getList(array $opts = [])
{
$opts = array_merge([
'limit' => 12,
'from' => 0,
'offset' => null,
'sort' => 'asc',
'type' => null,
'guid' => null,
'owner_guid' => null,
'entity_urn' => null,
'state' => null,
], $opts);
$must = [];
$must_not = [];
$sort = [
'@timestamp' => $opts['sort']
];
//
if ($opts['type']) {
$must[] = [
'term' => [
'type' => $opts['type'],
],
];
}
if ($opts['guid']) {
$must[] = [
'term' => [
'_id' => (string) $opts['guid'],
],
];
} elseif ($opts['owner_guid']) {
$must[] = [
'term' => [
'owner_guid' => (string) $opts['owner_guid'],
],
];
}
if ($opts['entity_urn']) {
$must[] = [
'term' => [
'entity_urn' => $opts['entity_urn']
]
];
}
if ($opts['state'] === 'approved') {
$must[] = [
'exists' => [
'field' => '@reviewed',
],
];
} elseif ($opts['state'] === 'in_review') {
$must_not[] = [
'exists' => [
'field' => '@reviewed',
],
];
}
if ($opts['state'] === 'approved' || $opts['state'] === 'in_review') {
$must_not[] = [
'exists' => [
'field' => '@completed',
],
];
$must_not[] = [
'exists' => [
'field' => '@rejected',
],
];
$must_not[] = [
'exists' => [
'field' => '@revoked',
],
];
}
if ($opts['offset']) {
$rangeKey = $opts['sort'] === 'asc' ? 'gt' : 'lt';
$must[] = [
'range' => [
'@timestamp' => [
$rangeKey => $opts['offset'],
],
],
];
}
//
$body = [
'query' => [
'bool' => [
'must' => $must,
'must_not' => $must_not,
],
],
'sort' => $sort,
];
$prepared = new Search();
$prepared->query([
'index' => 'minds-boost-campaigns',
'type' => '_doc',
'body' => $body,
'size' => $opts['limit'],
'from' => (int) ($opts['from'] ?? 0),
]);
$result = $this->es->request($prepared);
$response = new Response();
$offset = 0;
foreach ($result['hits']['hits'] as $doc) {
$campaign = new Campaign();
$campaign
->setUrn("urn:campaign:{$doc['_id']}")
->setType($doc['_source']['type'])
->setOwnerGuid($doc['_source']['owner_guid'])
->setName($doc['_source']['name'])
->setEntityUrns(Text::buildArray($doc['_source']['entity_urns']))
->setHashtags(Text::buildArray($doc['_source']['hashtags']))
->setNsfw(Number::buildIntArray($doc['_source']['nsfw']))
->setStart((int) $doc['_source']['start'])
->setEnd((int) $doc['_source']['end'])
->setBudget((string) $doc['_source']['budget'])
->setBudgetType($doc['_source']['budget_type'])
->setChecksum($doc['_source']['checksum'])
->setImpressions((int) $doc['_source']['impressions'])
->setImpressionsMet($doc['_source']['impressions_met'])
->setCreatedTimestamp((int) $doc['_source']['@timestamp'])
->setReviewedTimestamp((int) $doc['_source']['@reviewed'])
->setRejectedTimestamp((int) $doc['_source']['@rejected'])
->setRevokedTimestamp((int) $doc['_source']['@revoked'])
->setCompletedTimestamp((int) $doc['_source']['@completed']);
$response[] = $campaign;
$offset = $campaign->getCreatedTimestamp();
}
$response->setPagingToken($offset);
return $response;
}
/**
* @param Campaign $campaign
* @return bool
* @throws Exception
*/
public function add(Campaign $campaign)
{
$body = [
'doc' => [
'type' => $campaign->getType(),
'owner_guid' => (string) $campaign->getOwnerGuid(),
'name' => $campaign->getName(),
'entity_urns' => $campaign->getEntityUrns(),
'hashtags' => $campaign->getHashtags(),
'nsfw' => $campaign->getNsfw(),
'start' => (int) $campaign->getStart(),
'end' => (int) $campaign->getEnd(),
'budget' => $campaign->getBudget(),
'budget_type' => $campaign->getBudgetType(),
'checksum' => $campaign->getChecksum(),
'impressions' => $campaign->getImpressions(),
'@timestamp' => $campaign->getCreatedTimestamp(),
],
'doc_as_upsert' => true,
];
if ($campaign->getImpressionsMet()) {
$body['doc']['impressions_met'] = $campaign->getImpressionsMet();
}
if ($campaign->getReviewedTimestamp()) {
$body['doc']['@reviewed'] = $campaign->getReviewedTimestamp();
}
if ($campaign->getRejectedTimestamp()) {
$body['doc']['@rejected'] = $campaign->getRejectedTimestamp();
}
if ($campaign->getRevokedTimestamp()) {
$body['doc']['@revoked'] = $campaign->getRevokedTimestamp();
}
if ($campaign->getCompletedTimestamp()) {
$body['doc']['@completed'] = $campaign->getCompletedTimestamp();
}
$prepared = new Update();
$prepared->query([
'index' => 'minds-boost-campaigns',
'type' => '_doc',
'body' => $body,
'id' => (string) $campaign->getGuid(),
]);
return (bool) $this->es->request($prepared);
}
/**
* @param Campaign $campaign
* @return bool
* @throws Exception
*/
public function update(Campaign $campaign)
{
return $this->add($campaign);
}
/**
* @param Campaign $campaign
* @throws NotImplementedException
*/
public function delete(Campaign $campaign)
{
throw new NotImplementedException();
}
}
......@@ -16,6 +16,9 @@ class Manager
/** @var Repository */
protected $repository;
/** @var ElasticRepository */
protected $elasticRepository;
/** @var Delegates\CampaignUrnDelegate */
protected $campaignUrnDelegate;
......@@ -37,6 +40,7 @@ class Manager
/**
* Manager constructor.
* @param Repository $repository
* @param ElasticRepository $elasticRepository
* @param Delegates\CampaignUrnDelegate $campaignUrnDelegate
* @param Delegates\NormalizeDatesDelegate $normalizeDatesDelegate
* @param Delegates\NormalizeEntityUrnsDelegate $normalizeEntityUrnsDelegate
......@@ -45,6 +49,7 @@ class Manager
*/
public function __construct(
$repository = null,
$elasticRepository = null,
$campaignUrnDelegate = null,
$normalizeDatesDelegate = null,
$normalizeEntityUrnsDelegate = null,
......@@ -53,6 +58,7 @@ class Manager
)
{
$this->repository = $repository ?: new Repository();
$this->elasticRepository = $elasticRepository ?: new ElasticRepository();
// Delegates
......@@ -79,7 +85,7 @@ class Manager
*/
public function getList(array $opts = [])
{
return $this->repository->getList($opts);
return $this->elasticRepository->getList($opts);
}
/**
......@@ -96,7 +102,7 @@ class Manager
return null;
}
$campaigns = $this->repository->getList([
$campaigns = $this->elasticRepository->getList([
'guid' => $guid
])->toArray();
......@@ -142,6 +148,12 @@ class Manager
throw new CampaignException('Invalid campaign type');
}
// Validate checksum
if (!$campaign->getChecksum()) {
throw new CampaignException('Invalid checksum value');
}
// Run delegates
$campaign = $this->normalizeDatesDelegate->onCreate($campaign);
......@@ -154,11 +166,12 @@ class Manager
$campaign
->setCreatedTimestamp(time() * 1000);
$done = $this->repository->add($campaign);
// Write
if (!$done) {
throw new CampaignException('Cannot save campaign');
}
$this->repository->add($campaign);
$this->elasticRepository->add($campaign);
//
return $campaign;
}
......@@ -208,11 +221,12 @@ class Manager
$campaign = $this->normalizeHashtagsDelegate->onUpdate($campaign, $campaignRef);
$campaign = $this->budgetDelegate->onUpdate($campaign, $campaignRef); // Should be ALWAYS called after normalizing dates
$done = $this->repository->update($campaign);
// Write
if (!$done) {
throw new CampaignException('Cannot save campaign');
}
$this->repository->update($campaign);
$this->elasticRepository->update($campaign);
//
return $campaign;
}
......@@ -251,11 +265,12 @@ class Manager
->setStart($now) // Update start date so we can calculate distribution correctly
->setReviewedTimestamp($now);
$done = $this->repository->update($campaign);
// Write
if (!$done) {
throw new CampaignException('Cannot save campaign');
}
$this->repository->update($campaign);
$this->elasticRepository->update($campaign);
//
return $campaign;
}
......@@ -293,11 +308,12 @@ class Manager
$campaign
->setRevokedTimestamp(time() * 1000);
$done = $this->repository->update($campaign);
// Write
if (!$done) {
throw new CampaignException('Cannot save campaign');
}
$this->repository->update($campaign);
$this->elasticRepository->update($campaign);
// Budget updates
$this->budgetDelegate->onStateChange($campaign);
......@@ -335,11 +351,12 @@ class Manager
$campaign
->setRejectedTimestamp(time() * 1000);
$done = $this->repository->update($campaign);
// Write
if (!$done) {
throw new CampaignException('Cannot save campaign');
}
$this->repository->update($campaign);
$this->elasticRepository->update($campaign);
// Budget update
$this->budgetDelegate->onStateChange($campaign);
......@@ -377,11 +394,10 @@ class Manager
$campaign
->setCompletedTimestamp(time() * 1000);
$done = $this->repository->update($campaign);
// Write
if (!$done) {
throw new CampaignException('Cannot save campaign');
}
$this->repository->update($campaign);
$this->elasticRepository->update($campaign);
return $campaign;
}
......
......@@ -6,106 +6,37 @@
namespace Minds\Core\Boost\Campaigns;
use Cassandra\Varint;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Common\Urn;
use Minds\Core\Boost\Raw\RawBoost;
use Minds\Core\Boost\Raw\ElasticRepository as RawElasticRepository;
use Minds\Core\Boost\Raw\Repository as RawRepository;
use Minds\Core\Data\Cassandra\Client as CassandraClient;
use Minds\Core\Data\Cassandra\Prepared\Custom;
use Minds\Core\Di\Di;
use Minds\Core\EntitiesBuilder;
use Minds\Helpers\Text;
use NotImplementedException;
class Repository
{
/** @var RawElasticRepository $rawElasticRepository */
protected $rawElasticRepository;
/** @var RawRepository $rawRepository */
protected $rawRepository;
/** @var EntitiesBuilder */
protected $entitiesBuilder;
/** @var CassandraClient */
protected $db;
/**
* Repository constructor.
* @param EntitiesBuilder $entitiesBuilder
* @param RawRepository $rawRepository
* @param RawElasticRepository $rawElasticRepository
* @param CassandraClient $db
*/
public function __construct(
$entitiesBuilder = null,
$rawRepository = null,
$rawElasticRepository = null
$db = null
)
{
$this->entitiesBuilder = $entitiesBuilder ?: Di::_()->get('EntitiesBuilder');
$this->rawRepository = $rawRepository ?: new RawRepository();
$this->rawElasticRepository = $rawElasticRepository ?: new RawElasticRepository();
$this->db = $db ?: Di::_()->get('Database\Cassandra\Cql');
}
/**
* @param array $opts
* @return Response
* @throws NotImplementedException
*/
public function getList(array $opts = [])
{
$opts = array_merge([
'owner_guid' => null,
'offset' => '',
'limit' => 12,
'guid' => null,
], $opts);
$result = $this->rawElasticRepository->getList([
'is_campaign' => true,
'owner_guid' => $opts['owner_guid'],
'limit' => $opts['limit'],
'offset' => $opts['offset'],
'guid' => $opts['guid'],
'sort' => 'desc',
]);
return $result->map(function (RawBoost $rawBoost) {
$campaign = new Campaign();
// Entities
$entityUrns = array_map(function($entityUrn) {
if (is_numeric($entityUrn)) {
$entityUrn = "urn:entity:{$entityUrn}";
}
return $entityUrn;
}, Text::buildArray($rawBoost->getEntityUrns()));
// Hashtags
$tags = Text::buildArray($rawBoost->getTags());
// Campaign
$campaign
->setOwnerGuid($rawBoost->getOwnerGuid())
->setName($rawBoost->getCampaignName())
->setType($rawBoost->getType())
->setEntityUrns($entityUrns)
->setHashtags($tags)
->setStart($rawBoost->getCampaignStart())
->setEnd($rawBoost->getCampaignEnd())
->setBudget($rawBoost->getBid())
->setUrn("urn:campaign:{$rawBoost->getGuid()}")
->setImpressions($rawBoost->getImpressions())
->setImpressionsMet($rawBoost->getImpressionsMet())
->setCreatedTimestamp($rawBoost->getCreatedTimestamp())
->setReviewedTimestamp($rawBoost->getReviewedTimestamp())
->setRejectedTimestamp($rawBoost->getRejectedTimestamp())
->setRevokedTimestamp($rawBoost->getRevokedTimestamp())
->setCompletedTimestamp($rawBoost->getCompletedTimestamp());
return $campaign;
});
throw new NotImplementedException();
}
/**
......@@ -115,40 +46,39 @@ class Repository
*/
public function add(Campaign $campaign)
{
// TODO: Implement token method
$urn = new Urn($campaign->getUrn());
$guid = $urn->getNss();
// Raw Boost
$rawBoost = new RawBoost();
$rawBoost
->setOwnerGuid($campaign->getOwnerGuid())
->setCampaignName($campaign->getName())
->setType($campaign->getType())
->setEntityUrns($campaign->getEntityUrns())
->setTags($campaign->getHashtags())
->setCampaign(true)
->setCampaignStart($campaign->getStart())
->setCampaignEnd($campaign->getEnd())
->setBid($campaign->getBudget())
->setBidType('tokens')
->setGuid($guid)
->setImpressions($campaign->getImpressions())
->setRating(1)
->setPriority(true)
->setCreatedTimestamp($campaign->getCreatedTimestamp())
->setReviewedTimestamp($campaign->getReviewedTimestamp())
->setRejectedTimestamp($campaign->getRejectedTimestamp())
->setRevokedTimestamp($campaign->getRevokedTimestamp())
->setCompletedTimestamp($campaign->getCompletedTimestamp());
$cqlSave = $this->rawRepository->add($rawBoost);
$esSave = $this->rawElasticRepository->add($rawBoost);
return $cqlSave && $esSave;
$cql = "INSERT INTO boost_campaigns (type, guid, owner_guid, json_data, delivery_status) VALUES (?, ?, ?, ?, ?)";
$values = [
$campaign->getType(),
new Varint($campaign->getGuid()),
new Varint($campaign->getOwnerGuid()),
json_encode([
'urn' => $campaign->getUrn(),
'owner_guid' => (string) $campaign->getOwnerGuid(),
'name' => $campaign->getName(),
'type' => $campaign->getType(),
'entity_urns' => $campaign->getEntityUrns(),
'hashtags' => $campaign->getHashtags(),
'nsfw' => $campaign->getNsfw(),
'start' => $campaign->getStart(),
'end' => $campaign->getEnd(),
'budget' => $campaign->getBudget(),
'budget_type' => $campaign->getBudgetType(),
'checksum' => $campaign->getChecksum(),
'impressions' => $campaign->getImpressions(),
'impressions_met' => $campaign->getImpressionsMet(),
'created_timestamp' => $campaign->getCreatedTimestamp(),
'reviewed_timestamp' => $campaign->getReviewedTimestamp(),
'rejected_timestamp' => $campaign->getRejectedTimestamp(),
'revoked_timestamp' => $campaign->getRevokedTimestamp(),
'completed_timestamp' => $campaign->getCompletedTimestamp(),
]),
$campaign->getDeliveryStatus(),
];
$prepared = new Custom();
$prepared->query($cql, $values);
return (bool) $this->db->request($prepared, true);
}
/**
......
<?php
/**
* BoostDelivery
* @author edgebal
*/
namespace Minds\Core\Boost\Delivery;
use Minds\Traits\MagicAttributes;
/**
* Class BoostDelivery
* @package Minds\Core\Boost\Delivery
* @methj
*/
class BoostDelivery
{
use MagicAttributes;
protected $campaign;
}
<?php
/**
* Manager
* @author edgebal
*/
namespace Minds\Core\Boost\Delivery;
class Manager
{
}
<?php
/**
* Repository
* @author edgebal
*/
namespace Minds\Core\Boost\Delivery;
class Repository
{
public function getList(array $opts = [])
{
}
public function add(BoostDelivery $boostDelivery)
{
}
public function update(BoostDelivery $boostDelivery)
{
}
public function delete(BoostDelivery $boostDelivery)
{
}
}
......@@ -1406,4 +1406,20 @@ CREATE TABLE minds.update_markers (
read_timestamp timestamp,
updated_timestamp timestamp,
PRIMARY KEY (user_guid, entity_type, entity_guid, marker)
);
\ No newline at end of file
);
CREATE TABLE minds.boost_campaigns (
type text,
guid varint,
owner_guid varint,
json_data text,
delivery_status text,
PRIMARY KEY (type, guid)
) WITH CLUSTERING ORDER BY (guid ASC);
CREATE MATERIALIZED VIEW minds.boost_campaigns_by_owner AS
SELECT *
FROM minds.boost_campaigns
WHERE type IS NOT null AND owner_guid IS NOT null AND guid IS NOT null
PRIMARY KEY (type, owner_guid, guid)
WITH CLUSTERING ORDER BY (owner_guid ASC, guid DESC);
<?php
/**
* Number Helpers
*
* @author edgebal
*/
namespace Minds\Helpers;
class Number
{
/**
* @param mixed $value
* @return int[]
*/
public static function buildIntArray($value)
{
if (!$value) {
return [];
}
if (is_array($value)) {
return array_map('static::_buildIntArrayElement', $value);
}
return [ static::_buildIntArrayElement($value) ];
}
/**
* @param mixed $value
* @return string
*/
protected static function _buildIntArrayElement($value) {
return (int) $value;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment