...
 
Commits (3)
......@@ -1379,7 +1379,7 @@ CREATE TABLE minds.analytics_graphs (
last_synced timestamp,
data text,
PRIMARY KEY(key)
);
);
CREATE TABLE minds.views (
year int,
......@@ -1415,6 +1415,13 @@ CREATE TABLE minds.hidden_hashtags (
PRIMARY KEY (hashtag)
);
CREATE TABLE minds.search_dispatcher_queue (
entity_urn text,
last_retry timestamp,
retries int,
PRIMARY KEY (entity_urn)
);
CREATE TABLE minds.user_snapshots (
user_guid varint,
type text,
......
......@@ -4,10 +4,10 @@
*/
namespace Minds\Core\Search;
use Exception;
use Minds\Core;
use Minds\Core\Di\Di;
use Minds\Core\Events\Event;
use Minds\Entities;
class Events
{
......@@ -79,12 +79,30 @@ class Events
return;
}
Di::_()->get('Search\Index')
->index(
is_string($params['entity']) ?
unserialize($params['entity']) :
$params['entity']
);
$entity = is_string($params['entity']) ?
unserialize($params['entity']) :
$params['entity'];
try {
$wasIndexed = (bool) Di::_()->get('Search\Index')
->index($entity);
} catch (Exception $e) {
error_log("[Search/Events/search:index:dispatch] {$e}");
$wasIndexed = false;
}
// BannedException will return null (which is also falsy)
// So we should retry only on non-null responses from index()
if ($wasIndexed !== null) {
/** @var Core\Search\RetryQueue\Manager $retryQueueManager */
$retryQueueManager = Di::_()->get('Search\RetryQueue\Manager');
if ($wasIndexed) {
$retryQueueManager->prune($entity);
} else {
$retryQueueManager->retry($entity);
}
}
} catch (\Exception $e) {
error_log('[Search/Events/search:index:dispatch] ' . get_class($e) . ': ' . $e->getMessage());
......
......@@ -93,7 +93,7 @@ class Index
}
}
} catch (BannedException $e) {
$result = false;
$result = null;
} catch (\Exception $e) {
error_log('[Search/Index] ' . get_class($e) . ": {$e->getMessage()}");
$result = false;
......
<?php
/**
* RetryQueueManager
* @author edgebal
*/
namespace Minds\Core\Search\RetryQueue;
use Exception;
use Minds\Common\Urn;
use Minds\Core\Di\Di;
use Minds\Core\Events\EventsDispatcher;
class Manager
{
/** @var EventsDispatcher */
protected $eventsDispatcher;
/** @var Repository */
protected $repository;
/**
* RetryQueueManager constructor.
* @param EventsDispatcher $eventsDispatcher
* @param Repository $repository
*/
public function __construct(
$eventsDispatcher = null,
$repository = null
)
{
$this->eventsDispatcher = $eventsDispatcher ?: Di::_()->get('EventsDispatcher');
$this->repository = $repository ?: new Repository();
}
/**
* @param mixed $entity
* @return bool
* @throws Exception
*/
public function prune($entity)
{
$urn = (string) (new Urn($entity->guid));
$retryQueueEntry = new RetryQueueEntry();
$retryQueueEntry
->setEntityUrn($urn);
return (bool) $this->repository->delete($retryQueueEntry);
}
/**
* @param mixed $entity
* @return bool
* @throws Exception
*/
public function retry($entity)
{
$urn = (string) (new Urn($entity->guid));
$retryQueueEntry = $this->repository->get($urn);
$retries = $retryQueueEntry->getRetries() + 1;
$retryQueueEntry
->setLastRetry(time())
->setRetries($retries);
$retrySaved = $this->repository->add($retryQueueEntry);
if (!$retrySaved) {
error_log("[RetryQueueManager] Critical: Cannot save retry to queue table: {$urn}");
} elseif ($retries < 5) {
error_log("[RetryQueueManager] Warn: Re-queue: {$urn}");
$this->eventsDispatcher->trigger('search:index', 'all', [
'entity' => $entity
]);
} else {
error_log("[RetryQueueManager] Critical: Too many retries indexing: {$urn}");
}
return $retrySaved;
}
}
<?php
/**
* Repository
* @author edgebal
*/
namespace Minds\Core\Search\RetryQueue;
use Cassandra\Rows;
use Cassandra\Timestamp;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Core\Data\Cassandra\Client as CassandraClient;
use Minds\Core\Data\Cassandra\Prepared\Custom;
use Minds\Core\Di\Di;
class Repository
{
/** @var CassandraClient */
protected $db;
public function __construct(
$db = null
)
{
$this->db = $db ?: Di::_()->get('Database\Cassandra\Cql');
}
/**
* @param array $opts
* @return Response
*/
public function getList(array $opts = [])
{
$opts = array_merge([
'entity_urn' => null,
], $opts);
$cql = "SELECT * FROM search_dispatcher_queue";
$values = [];
$cqlOpts = [];
if ($opts['entity_urn']) {
$cql .= ' WHERE entity_urn = ?';
$values[] = $opts['entity_urn'];
}
if ($opts['limit'] ?? null) {
$cqlOpts['page_size'] = (int) $opts['limit'];
}
if ($opts['offset'] ?? null) {
$cqlOpts['paging_state_token'] = base64_decode($opts['offset']);
}
$prepared = new Custom();
$prepared->query($cql, $values);
$prepared->setOpts($cqlOpts);
$response = new Response();
try {
/** @var Rows $rows */
$rows = $this->db->request($prepared);
if ($rows) {
foreach ($rows as $row) {
$retryQueueEntry = new RetryQueueEntry();
$retryQueueEntry
->setEntityUrn($row['entity_urn'])
->setLastRetry($row['last_retry']->time())
->setRetries($row['retries']);
$response[] = $retryQueueEntry;
}
$response->setPagingToken(base64_encode($rows->pagingStateToken()));
$response->setLastPage($rows->isLastPage());
}
} catch (Exception $e) {
error_log($e);
$response->setException($e);
}
return $response;
}
/**
* @param $urn
* @return RetryQueueEntry
*/
public function get($urn)
{
$retryQueueEntries = $this->getList([
'entity_urn' => $urn,
])->toArray();
if (count($retryQueueEntries)) {
return $retryQueueEntries[0];
} else {
$retryQueueEntry = new RetryQueueEntry();
$retryQueueEntry
->setEntityUrn($urn)
->setLastRetry(time())
->setRetries(0);
return $retryQueueEntry;
}
}
/**
* @param RetryQueueEntry $retryQueueEntry
* @return bool
* @throws Exception
*/
public function add(RetryQueueEntry $retryQueueEntry)
{
if (!$retryQueueEntry->getEntityUrn()) {
throw new Exception('Missing URN');
}
$cql = "INSERT INTO search_dispatcher_queue (entity_urn, last_retry, retries) VALUES (?, ?, ?)";
$values = [
(string) $retryQueueEntry->getEntityUrn(),
new Timestamp($retryQueueEntry->getLastRetry()),
(int) $retryQueueEntry->getRetries(),
];
$prepared = new Custom();
$prepared->query($cql, $values);
try {
return (bool) $this->db->request($prepared, true);
} catch (Exception $e) {
error_log($e);
return false;
}
}
/**
* @param RetryQueueEntry $retryQueueEntry
* @return bool
* @throws Exception
*/
public function update(RetryQueueEntry $retryQueueEntry)
{
return $this->add($retryQueueEntry);
}
/**
* @param RetryQueueEntry $retryQueueEntry
* @return bool
* @throws Exception
*/
public function delete(RetryQueueEntry $retryQueueEntry)
{
if (!$retryQueueEntry->getEntityUrn()) {
throw new Exception('Missing URN');
}
$cql = "DELETE FROM search_dispatcher_queue WHERE entity_urn = ?";
$values = [
(string) $retryQueueEntry->getEntityUrn(),
];
$prepared = new Custom();
$prepared->query($cql, $values);
try {
return (bool) $this->db->request($prepared, true);
} catch (Exception $e) {
error_log($e);
return false;
}
}
}
<?php
/**
* RetryQueueEntry
* @author edgebal
*/
namespace Minds\Core\Search\RetryQueue;
use Minds\Traits\MagicAttributes;
/**
* Class RetryQueueEntry
* @package Minds\Core\Search\RetryQueue
* @method string getEntityUrn()
* @method RetryQueueEntry setEntityUrn(string $entityUrn)
* @method int getRetries()
* @method RetryQueueEntry setRetries(int $retries)
* @method int getLastRetry()
* @method RetryQueueEntry setLastRetry(int $entityUrn)
*/
class RetryQueueEntry
{
use MagicAttributes;
protected $entityUrn;
protected $retries;
protected $lastRetry;
}
......@@ -9,7 +9,6 @@
namespace Minds\Core\Search;
use Minds\Core\Di\Provider;
use Minds\Core\Search\Hashtags\Manager;
class SearchProvider extends Provider
{
......@@ -40,7 +39,11 @@ class SearchProvider extends Provider
}, ['useFactory' => true]);
$this->di->bind('Search\Hashtags\Manager', function ($di) {
return new Manager();
return new Hashtags\Manager();
}, ['useFactory' => true]);
$this->di->bind('Search\RetryQueue\Manager', function ($di) {
return new RetryQueue\Manager();
}, ['useFactory' => true]);
}
}
<?php
namespace Spec\Minds\Core\Search\RetryQueue;
use Minds\Core\Events\EventsDispatcher;
use Minds\Core\Search\RetryQueue\Manager;
use Minds\Core\Search\RetryQueue\Repository;
use Minds\Core\Search\RetryQueue\RetryQueueEntry;
use Minds\Entities\Entity;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
class ManagerSpec extends ObjectBehavior
{
/** @var EventsDispatcher */
protected $eventsDispatcher;
/** @var Repository */
protected $repository;
function let(
EventsDispatcher $eventsDispatcher,
Repository $repository
)
{
$this->beConstructedWith($eventsDispatcher, $repository);
$this->eventsDispatcher = $eventsDispatcher;
$this->repository = $repository;
}
function it_is_initializable()
{
$this->shouldHaveType(Manager::class);
}
function it_should_prune(Entity $entity)
{
$entity->get('guid')
->shouldBeCalled()
->willReturn('5000');
$this->repository->delete(Argument::type(RetryQueueEntry::class))
->shouldBeCalled()
->willReturn(true);
$this
->prune($entity)
->shouldReturn(true);
}
function it_should_retry(Entity $entity, RetryQueueEntry $retryQueueEntry)
{
$entity->get('guid')
->shouldBeCalled()
->willReturn('5000');
$this->repository->get('urn:entity:5000')
->shouldBeCalled()
->willReturn($retryQueueEntry);
$retryQueueEntry->getRetries()
->shouldBeCalled()
->willReturn(3);
$retryQueueEntry->setLastRetry(Argument::type('int'))
->shouldBeCalled()
->willReturn($retryQueueEntry);
$retryQueueEntry->setRetries(4)
->shouldBeCalled()
->willReturn($retryQueueEntry);
$this->repository->add($retryQueueEntry)
->shouldBeCalled()
->willReturn(true);
$this->eventsDispatcher->trigger('search:index', 'all', [
'entity' => $entity
])
->shouldBeCalled()
->willReturn(true);
$this
->retry($entity)
->shouldReturn(true);
}
function it_should_not_retry_if_too_many_attempts(Entity $entity, RetryQueueEntry $retryQueueEntry)
{
$entity->get('guid')
->shouldBeCalled()
->willReturn('5000');
$this->repository->get('urn:entity:5000')
->shouldBeCalled()
->willReturn($retryQueueEntry);
$retryQueueEntry->getRetries()
->shouldBeCalled()
->willReturn(4);
$retryQueueEntry->setLastRetry(Argument::type('int'))
->shouldBeCalled()
->willReturn($retryQueueEntry);
$retryQueueEntry->setRetries(5)
->shouldBeCalled()
->willReturn($retryQueueEntry);
$this->repository->add($retryQueueEntry)
->shouldBeCalled()
->willReturn(true);
$this->eventsDispatcher->trigger('search:index', Argument::cetera())
->shouldNotBeCalled();
$this
->retry($entity)
->shouldReturn(true);
}
}
<?php
namespace Spec\Minds\Core\Search\RetryQueue;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Core\Data\Cassandra\Client as CassandraClient;
use Minds\Core\Data\Cassandra\Prepared\Custom;
use Minds\Core\Search\RetryQueue\Repository;
use Minds\Core\Search\RetryQueue\RetryQueueEntry;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
use Spec\Minds\Mocks\Cassandra\Rows;
class RepositorySpec extends ObjectBehavior
{
/** @var CassandraClient */
protected $db;
function let(
CassandraClient $db
)
{
$this->beConstructedWith($db);
$this->db = $db;
}
function it_is_initializable()
{
$this->shouldHaveType(Repository::class);
}
function it_should_get_a_list()
{
$this->db->request(Argument::that(function (Custom $prepared) {
$query = $prepared->build();
return $query['string'] == 'SELECT * FROM search_dispatcher_queue' &&
$query['values'] === [];
}))
->shouldBeCalled()
->willReturn(new Rows([], '1a2b3c4d5e6f7890'));
$this
->getList([])
->shouldReturnAnInstanceOf(Response::class);
}
function it_should_get_by_a_list_by_urn()
{
$this->db->request(Argument::that(function (Custom $prepared) {
$query = $prepared->build();
return $query['string'] == 'SELECT * FROM search_dispatcher_queue WHERE entity_urn = ?' &&
$query['values'] === ['urn:test:123456'];
}))
->shouldBeCalled()
->willReturn(new Rows([
[
'entity_urn' => 'urn:test:123456',
'last_retry' => new \Cassandra\Timestamp(1562182542),
'retries' => 3,
]
], '1a2b3c4d5e6f7890'));
$this
->getList([
'entity_urn' => 'urn:test:123456',
])
->shouldReturnAnInstanceOf(Response::class);
}
function it_should_add(RetryQueueEntry $retryQueueEntry)
{
$retryQueueEntry->getEntityUrn()
->shouldBeCalled()
->willReturn('urn:test:123456');
$retryQueueEntry->getLastRetry()
->shouldBeCalled()
->willReturn(1562182542);
$retryQueueEntry->getRetries()
->shouldBeCalled()
->willReturn(3);
$this->db->request(Argument::that(function (Custom $prepared) {
$query = $prepared->build();
return stripos($query['string'], 'INSERT INTO search_dispatcher_queue') === 0;
}), true)
->shouldBeCalled()
->willReturn(true);
$this
->add($retryQueueEntry)
->shouldNotReturn(false);
}
function it_should_throw_during_add_if_no_entity_urn(RetryQueueEntry $retryQueueEntry)
{
$retryQueueEntry->getEntityUrn()
->shouldBeCalled()
->willReturn(null);
$this->db->request(Argument::cetera())
->shouldNotBeCalled();
$this
->shouldThrow(new Exception('Missing URN'))
->duringAdd($retryQueueEntry);
}
function it_should_delete(RetryQueueEntry $retryQueueEntry)
{
$retryQueueEntry->getEntityUrn()
->shouldBeCalled()
->willReturn('urn:test:123456');
$this->db->request(Argument::that(function (Custom $prepared) {
$query = $prepared->build();
return stripos($query['string'], 'DELETE FROM search_dispatcher_queue') === 0;
}), true)
->shouldBeCalled()
->willReturn(true);
$this
->delete($retryQueueEntry)
->shouldNotReturn(false);
}
function it_should_throw_during_delete_if_no_entity_urn(RetryQueueEntry $retryQueueEntry)
{
$retryQueueEntry->getEntityUrn()
->shouldBeCalled()
->willReturn(null);
$this->db->request(Argument::cetera())
->shouldNotBeCalled();
$this
->shouldThrow(new Exception('Missing URN'))
->duringDelete($retryQueueEntry);
}
}