Skip to content
Next
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Switch to GitLab Next
Sign in / Register
Toggle navigation
Minds Backend - Engine
Project
Project
Details
Activity
Releases
Dependency List
Cycle Analytics
Insights
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Locked Files
Issues
142
Issues
142
List
Boards
Labels
Service Desk
Milestones
Merge Requests
47
Merge Requests
47
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Minds
Minds Backend - Engine
Compare Revisions
23c93fa497ffb0061fcf382be364f444285ebd78...9b79f9efe2ac0abe14ac8852bdb6541beed9bf35
Source
9b79f9efe2ac0abe14ac8852bdb6541beed9bf35
Select Git revision
...
Target
23c93fa497ffb0061fcf382be364f444285ebd78
Select Git revision
Compare
Commits (2)
(feat): Retry queue for ElasticSearch
· 5262cdb3
Emiliano Balbuena
authored
5 hours ago
5262cdb3
Merge branch 'sprint/ii.es-retry-580' into 'master'
· 9b79f9ef
Mark Harding
authored
5 hours ago
(feat): Retry queue for ElasticSearch See merge request
!250
9b79f9ef
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
598 additions
and
11 deletions
+598
-11
cassandra-provision.cql
Core/Provisioner/Provisioners/cassandra-provision.cql
+8
-1
Events.php
Core/Search/Events.php
+25
-7
Index.php
Core/Search/Index.php
+1
-1
Manager.php
Core/Search/RetryQueue/Manager.php
+84
-0
Repository.php
Core/Search/RetryQueue/Repository.php
+176
-0
RetryQueueEntry.php
Core/Search/RetryQueue/RetryQueueEntry.php
+30
-0
SearchProvider.php
Core/Search/SearchProvider.php
+5
-2
ManagerSpec.php
Spec/Core/Search/RetryQueue/ManagerSpec.php
+122
-0
RepositorySpec.php
Spec/Core/Search/RetryQueue/RepositorySpec.php
+147
-0
No files found.
Core/Provisioner/Provisioners/cassandra-provision.cql
View file @
9b79f9ef
...
...
@@ -1379,7 +1379,7 @@ CREATE TABLE minds.analytics_graphs (
last_synced timestamp,
data text,
PRIMARY KEY(key)
);
);
CREATE TABLE minds.views (
year int,
...
...
@@ -1415,6 +1415,13 @@ CREATE TABLE minds.hidden_hashtags (
PRIMARY KEY (hashtag)
);
CREATE TABLE minds.search_dispatcher_queue (
entity_urn text,
last_retry timestamp,
retries int,
PRIMARY KEY (entity_urn)
);
CREATE TABLE minds.user_snapshots (
user_guid varint,
type text,
...
...
This diff is collapsed.
Click to expand it.
Core/Search/Events.php
View file @
9b79f9ef
...
...
@@ -4,10 +4,10 @@
*/
namespace
Minds\Core\Search
;
use
Exception
;
use
Minds\Core
;
use
Minds\Core\Di\Di
;
use
Minds\Core\Events\Event
;
use
Minds\Entities
;
class
Events
{
...
...
@@ -79,12 +79,30 @@ class Events
return
;
}
Di
::
_
()
->
get
(
'Search\Index'
)
->
index
(
is_string
(
$params
[
'entity'
])
?
unserialize
(
$params
[
'entity'
])
:
$params
[
'entity'
]
);
$entity
=
is_string
(
$params
[
'entity'
])
?
unserialize
(
$params
[
'entity'
])
:
$params
[
'entity'
];
try
{
$wasIndexed
=
(
bool
)
Di
::
_
()
->
get
(
'Search\Index'
)
->
index
(
$entity
);
}
catch
(
Exception
$e
)
{
error_log
(
"[Search/Events/search:index:dispatch]
{
$e
}
"
);
$wasIndexed
=
false
;
}
// BannedException will return null (which is also falsy)
// So we should retry only on non-null responses from index()
if
(
$wasIndexed
!==
null
)
{
/** @var Core\Search\RetryQueue\Manager $retryQueueManager */
$retryQueueManager
=
Di
::
_
()
->
get
(
'Search\RetryQueue\Manager'
);
if
(
$wasIndexed
)
{
$retryQueueManager
->
prune
(
$entity
);
}
else
{
$retryQueueManager
->
retry
(
$entity
);
}
}
}
catch
(
\Exception
$e
)
{
error_log
(
'[Search/Events/search:index:dispatch] '
.
get_class
(
$e
)
.
': '
.
$e
->
getMessage
());
...
...
This diff is collapsed.
Click to expand it.
Core/Search/Index.php
View file @
9b79f9ef
...
...
@@ -93,7 +93,7 @@ class Index
}
}
}
catch
(
BannedException
$e
)
{
$result
=
false
;
$result
=
null
;
}
catch
(
\Exception
$e
)
{
error_log
(
'[Search/Index] '
.
get_class
(
$e
)
.
":
{
$e
->
getMessage
()
}
"
);
$result
=
false
;
...
...
This diff is collapsed.
Click to expand it.
Core/Search/RetryQueue/Manager.php
0 → 100644
View file @
9b79f9ef
<?php
/**
* RetryQueueManager
* @author edgebal
*/
namespace
Minds\Core\Search\RetryQueue
;
use
Exception
;
use
Minds\Common\Urn
;
use
Minds\Core\Di\Di
;
use
Minds\Core\Events\EventsDispatcher
;
class
Manager
{
/** @var EventsDispatcher */
protected
$eventsDispatcher
;
/** @var Repository */
protected
$repository
;
/**
* RetryQueueManager constructor.
* @param EventsDispatcher $eventsDispatcher
* @param Repository $repository
*/
public
function
__construct
(
$eventsDispatcher
=
null
,
$repository
=
null
)
{
$this
->
eventsDispatcher
=
$eventsDispatcher
?:
Di
::
_
()
->
get
(
'EventsDispatcher'
);
$this
->
repository
=
$repository
?:
new
Repository
();
}
/**
* @param mixed $entity
* @return bool
* @throws Exception
*/
public
function
prune
(
$entity
)
{
$urn
=
(
string
)
(
new
Urn
(
$entity
->
guid
));
$retryQueueEntry
=
new
RetryQueueEntry
();
$retryQueueEntry
->
setEntityUrn
(
$urn
);
return
(
bool
)
$this
->
repository
->
delete
(
$retryQueueEntry
);
}
/**
* @param mixed $entity
* @return bool
* @throws Exception
*/
public
function
retry
(
$entity
)
{
$urn
=
(
string
)
(
new
Urn
(
$entity
->
guid
));
$retryQueueEntry
=
$this
->
repository
->
get
(
$urn
);
$retries
=
$retryQueueEntry
->
getRetries
()
+
1
;
$retryQueueEntry
->
setLastRetry
(
time
())
->
setRetries
(
$retries
);
$retrySaved
=
$this
->
repository
->
add
(
$retryQueueEntry
);
if
(
!
$retrySaved
)
{
error_log
(
"[RetryQueueManager] Critical: Cannot save retry to queue table:
{
$urn
}
"
);
}
elseif
(
$retries
<
5
)
{
error_log
(
"[RetryQueueManager] Warn: Re-queue:
{
$urn
}
"
);
$this
->
eventsDispatcher
->
trigger
(
'search:index'
,
'all'
,
[
'entity'
=>
$entity
]);
}
else
{
error_log
(
"[RetryQueueManager] Critical: Too many retries indexing:
{
$urn
}
"
);
}
return
$retrySaved
;
}
}
This diff is collapsed.
Click to expand it.
Core/Search/RetryQueue/Repository.php
0 → 100644
View file @
9b79f9ef
<?php
/**
* Repository
* @author edgebal
*/
namespace
Minds\Core\Search\RetryQueue
;
use
Cassandra\Rows
;
use
Cassandra\Timestamp
;
use
Exception
;
use
Minds\Common\Repository\Response
;
use
Minds\Core\Data\Cassandra\Client
as
CassandraClient
;
use
Minds\Core\Data\Cassandra\Prepared\Custom
;
use
Minds\Core\Di\Di
;
class
Repository
{
/** @var CassandraClient */
protected
$db
;
public
function
__construct
(
$db
=
null
)
{
$this
->
db
=
$db
?:
Di
::
_
()
->
get
(
'Database\Cassandra\Cql'
);
}
/**
* @param array $opts
* @return Response
*/
public
function
getList
(
array
$opts
=
[])
{
$opts
=
array_merge
([
'entity_urn'
=>
null
,
],
$opts
);
$cql
=
"SELECT * FROM search_dispatcher_queue"
;
$values
=
[];
$cqlOpts
=
[];
if
(
$opts
[
'entity_urn'
])
{
$cql
.=
' WHERE entity_urn = ?'
;
$values
[]
=
$opts
[
'entity_urn'
];
}
if
(
$opts
[
'limit'
]
??
null
)
{
$cqlOpts
[
'page_size'
]
=
(
int
)
$opts
[
'limit'
];
}
if
(
$opts
[
'offset'
]
??
null
)
{
$cqlOpts
[
'paging_state_token'
]
=
base64_decode
(
$opts
[
'offset'
]);
}
$prepared
=
new
Custom
();
$prepared
->
query
(
$cql
,
$values
);
$prepared
->
setOpts
(
$cqlOpts
);
$response
=
new
Response
();
try
{
/** @var Rows $rows */
$rows
=
$this
->
db
->
request
(
$prepared
);
if
(
$rows
)
{
foreach
(
$rows
as
$row
)
{
$retryQueueEntry
=
new
RetryQueueEntry
();
$retryQueueEntry
->
setEntityUrn
(
$row
[
'entity_urn'
])
->
setLastRetry
(
$row
[
'last_retry'
]
->
time
())
->
setRetries
(
$row
[
'retries'
]);
$response
[]
=
$retryQueueEntry
;
}
$response
->
setPagingToken
(
base64_encode
(
$rows
->
pagingStateToken
()));
$response
->
setLastPage
(
$rows
->
isLastPage
());
}
}
catch
(
Exception
$e
)
{
error_log
(
$e
);
$response
->
setException
(
$e
);
}
return
$response
;
}
/**
* @param $urn
* @return RetryQueueEntry
*/
public
function
get
(
$urn
)
{
$retryQueueEntries
=
$this
->
getList
([
'entity_urn'
=>
$urn
,
])
->
toArray
();
if
(
count
(
$retryQueueEntries
))
{
return
$retryQueueEntries
[
0
];
}
else
{
$retryQueueEntry
=
new
RetryQueueEntry
();
$retryQueueEntry
->
setEntityUrn
(
$urn
)
->
setLastRetry
(
time
())
->
setRetries
(
0
);
return
$retryQueueEntry
;
}
}
/**
* @param RetryQueueEntry $retryQueueEntry
* @return bool
* @throws Exception
*/
public
function
add
(
RetryQueueEntry
$retryQueueEntry
)
{
if
(
!
$retryQueueEntry
->
getEntityUrn
())
{
throw
new
Exception
(
'Missing URN'
);
}
$cql
=
"INSERT INTO search_dispatcher_queue (entity_urn, last_retry, retries) VALUES (?, ?, ?)"
;
$values
=
[
(
string
)
$retryQueueEntry
->
getEntityUrn
(),
new
Timestamp
(
$retryQueueEntry
->
getLastRetry
()),
(
int
)
$retryQueueEntry
->
getRetries
(),
];
$prepared
=
new
Custom
();
$prepared
->
query
(
$cql
,
$values
);
try
{
return
(
bool
)
$this
->
db
->
request
(
$prepared
,
true
);
}
catch
(
Exception
$e
)
{
error_log
(
$e
);
return
false
;
}
}
/**
* @param RetryQueueEntry $retryQueueEntry
* @return bool
* @throws Exception
*/
public
function
update
(
RetryQueueEntry
$retryQueueEntry
)
{
return
$this
->
add
(
$retryQueueEntry
);
}
/**
* @param RetryQueueEntry $retryQueueEntry
* @return bool
* @throws Exception
*/
public
function
delete
(
RetryQueueEntry
$retryQueueEntry
)
{
if
(
!
$retryQueueEntry
->
getEntityUrn
())
{
throw
new
Exception
(
'Missing URN'
);
}
$cql
=
"DELETE FROM search_dispatcher_queue WHERE entity_urn = ?"
;
$values
=
[
(
string
)
$retryQueueEntry
->
getEntityUrn
(),
];
$prepared
=
new
Custom
();
$prepared
->
query
(
$cql
,
$values
);
try
{
return
(
bool
)
$this
->
db
->
request
(
$prepared
,
true
);
}
catch
(
Exception
$e
)
{
error_log
(
$e
);
return
false
;
}
}
}
This diff is collapsed.
Click to expand it.
Core/Search/RetryQueue/RetryQueueEntry.php
0 → 100644
View file @
9b79f9ef
<?php
/**
* RetryQueueEntry
* @author edgebal
*/
namespace
Minds\Core\Search\RetryQueue
;
use
Minds\Traits\MagicAttributes
;
/**
* Class RetryQueueEntry
* @package Minds\Core\Search\RetryQueue
* @method string getEntityUrn()
* @method RetryQueueEntry setEntityUrn(string $entityUrn)
* @method int getRetries()
* @method RetryQueueEntry setRetries(int $retries)
* @method int getLastRetry()
* @method RetryQueueEntry setLastRetry(int $entityUrn)
*/
class
RetryQueueEntry
{
use
MagicAttributes
;
protected
$entityUrn
;
protected
$retries
;
protected
$lastRetry
;
}
This diff is collapsed.
Click to expand it.
Core/Search/SearchProvider.php
View file @
9b79f9ef
...
...
@@ -9,7 +9,6 @@
namespace
Minds\Core\Search
;
use
Minds\Core\Di\Provider
;
use
Minds\Core\Search\Hashtags\Manager
;
class
SearchProvider
extends
Provider
{
...
...
@@ -40,7 +39,11 @@ class SearchProvider extends Provider
},
[
'useFactory'
=>
true
]);
$this
->
di
->
bind
(
'Search\Hashtags\Manager'
,
function
(
$di
)
{
return
new
Manager
();
return
new
Hashtags\Manager
();
},
[
'useFactory'
=>
true
]);
$this
->
di
->
bind
(
'Search\RetryQueue\Manager'
,
function
(
$di
)
{
return
new
RetryQueue\Manager
();
},
[
'useFactory'
=>
true
]);
}
}
This diff is collapsed.
Click to expand it.
Spec/Core/Search/RetryQueue/ManagerSpec.php
0 → 100644
View file @
9b79f9ef
<?php
namespace
Spec\Minds\Core\Search\RetryQueue
;
use
Minds\Core\Events\EventsDispatcher
;
use
Minds\Core\Search\RetryQueue\Manager
;
use
Minds\Core\Search\RetryQueue\Repository
;
use
Minds\Core\Search\RetryQueue\RetryQueueEntry
;
use
Minds\Entities\Entity
;
use
PhpSpec\ObjectBehavior
;
use
Prophecy\Argument
;
class
ManagerSpec
extends
ObjectBehavior
{
/** @var EventsDispatcher */
protected
$eventsDispatcher
;
/** @var Repository */
protected
$repository
;
function
let
(
EventsDispatcher
$eventsDispatcher
,
Repository
$repository
)
{
$this
->
beConstructedWith
(
$eventsDispatcher
,
$repository
);
$this
->
eventsDispatcher
=
$eventsDispatcher
;
$this
->
repository
=
$repository
;
}
function
it_is_initializable
()
{
$this
->
shouldHaveType
(
Manager
::
class
);
}
function
it_should_prune
(
Entity
$entity
)
{
$entity
->
get
(
'guid'
)
->
shouldBeCalled
()
->
willReturn
(
'5000'
);
$this
->
repository
->
delete
(
Argument
::
type
(
RetryQueueEntry
::
class
))
->
shouldBeCalled
()
->
willReturn
(
true
);
$this
->
prune
(
$entity
)
->
shouldReturn
(
true
);
}
function
it_should_retry
(
Entity
$entity
,
RetryQueueEntry
$retryQueueEntry
)
{
$entity
->
get
(
'guid'
)
->
shouldBeCalled
()
->
willReturn
(
'5000'
);
$this
->
repository
->
get
(
'urn:entity:5000'
)
->
shouldBeCalled
()
->
willReturn
(
$retryQueueEntry
);
$retryQueueEntry
->
getRetries
()
->
shouldBeCalled
()
->
willReturn
(
3
);
$retryQueueEntry
->
setLastRetry
(
Argument
::
type
(
'int'
))
->
shouldBeCalled
()
->
willReturn
(
$retryQueueEntry
);
$retryQueueEntry
->
setRetries
(
4
)
->
shouldBeCalled
()
->
willReturn
(
$retryQueueEntry
);
$this
->
repository
->
add
(
$retryQueueEntry
)
->
shouldBeCalled
()
->
willReturn
(
true
);
$this
->
eventsDispatcher
->
trigger
(
'search:index'
,
'all'
,
[
'entity'
=>
$entity
])
->
shouldBeCalled
()
->
willReturn
(
true
);
$this
->
retry
(
$entity
)
->
shouldReturn
(
true
);
}
function
it_should_not_retry_if_too_many_attempts
(
Entity
$entity
,
RetryQueueEntry
$retryQueueEntry
)
{
$entity
->
get
(
'guid'
)
->
shouldBeCalled
()
->
willReturn
(
'5000'
);
$this
->
repository
->
get
(
'urn:entity:5000'
)
->
shouldBeCalled
()
->
willReturn
(
$retryQueueEntry
);
$retryQueueEntry
->
getRetries
()
->
shouldBeCalled
()
->
willReturn
(
4
);
$retryQueueEntry
->
setLastRetry
(
Argument
::
type
(
'int'
))
->
shouldBeCalled
()
->
willReturn
(
$retryQueueEntry
);
$retryQueueEntry
->
setRetries
(
5
)
->
shouldBeCalled
()
->
willReturn
(
$retryQueueEntry
);
$this
->
repository
->
add
(
$retryQueueEntry
)
->
shouldBeCalled
()
->
willReturn
(
true
);
$this
->
eventsDispatcher
->
trigger
(
'search:index'
,
Argument
::
cetera
())
->
shouldNotBeCalled
();
$this
->
retry
(
$entity
)
->
shouldReturn
(
true
);
}
}
This diff is collapsed.
Click to expand it.
Spec/Core/Search/RetryQueue/RepositorySpec.php
0 → 100644
View file @
9b79f9ef
<?php
namespace
Spec\Minds\Core\Search\RetryQueue
;
use
Exception
;
use
Minds\Common\Repository\Response
;
use
Minds\Core\Data\Cassandra\Client
as
CassandraClient
;
use
Minds\Core\Data\Cassandra\Prepared\Custom
;
use
Minds\Core\Search\RetryQueue\Repository
;
use
Minds\Core\Search\RetryQueue\RetryQueueEntry
;
use
PhpSpec\ObjectBehavior
;
use
Prophecy\Argument
;
use
Spec\Minds\Mocks\Cassandra\Rows
;
class
RepositorySpec
extends
ObjectBehavior
{
/** @var CassandraClient */
protected
$db
;
function
let
(
CassandraClient
$db
)
{
$this
->
beConstructedWith
(
$db
);
$this
->
db
=
$db
;
}
function
it_is_initializable
()
{
$this
->
shouldHaveType
(
Repository
::
class
);
}
function
it_should_get_a_list
()
{
$this
->
db
->
request
(
Argument
::
that
(
function
(
Custom
$prepared
)
{
$query
=
$prepared
->
build
();
return
$query
[
'string'
]
==
'SELECT * FROM search_dispatcher_queue'
&&
$query
[
'values'
]
===
[];
}))
->
shouldBeCalled
()
->
willReturn
(
new
Rows
([],
'1a2b3c4d5e6f7890'
));
$this
->
getList
([])
->
shouldReturnAnInstanceOf
(
Response
::
class
);
}
function
it_should_get_by_a_list_by_urn
()
{
$this
->
db
->
request
(
Argument
::
that
(
function
(
Custom
$prepared
)
{
$query
=
$prepared
->
build
();
return
$query
[
'string'
]
==
'SELECT * FROM search_dispatcher_queue WHERE entity_urn = ?'
&&
$query
[
'values'
]
===
[
'urn:test:123456'
];
}))
->
shouldBeCalled
()
->
willReturn
(
new
Rows
([
[
'entity_urn'
=>
'urn:test:123456'
,
'last_retry'
=>
new
\Cassandra\Timestamp
(
1562182542
),
'retries'
=>
3
,
]
],
'1a2b3c4d5e6f7890'
));
$this
->
getList
([
'entity_urn'
=>
'urn:test:123456'
,
])
->
shouldReturnAnInstanceOf
(
Response
::
class
);
}
function
it_should_add
(
RetryQueueEntry
$retryQueueEntry
)
{
$retryQueueEntry
->
getEntityUrn
()
->
shouldBeCalled
()
->
willReturn
(
'urn:test:123456'
);
$retryQueueEntry
->
getLastRetry
()
->
shouldBeCalled
()
->
willReturn
(
1562182542
);
$retryQueueEntry
->
getRetries
()
->
shouldBeCalled
()
->
willReturn
(
3
);
$this
->
db
->
request
(
Argument
::
that
(
function
(
Custom
$prepared
)
{
$query
=
$prepared
->
build
();
return
stripos
(
$query
[
'string'
],
'INSERT INTO search_dispatcher_queue'
)
===
0
;
}),
true
)
->
shouldBeCalled
()
->
willReturn
(
true
);
$this
->
add
(
$retryQueueEntry
)
->
shouldNotReturn
(
false
);
}
function
it_should_throw_during_add_if_no_entity_urn
(
RetryQueueEntry
$retryQueueEntry
)
{
$retryQueueEntry
->
getEntityUrn
()
->
shouldBeCalled
()
->
willReturn
(
null
);
$this
->
db
->
request
(
Argument
::
cetera
())
->
shouldNotBeCalled
();
$this
->
shouldThrow
(
new
Exception
(
'Missing URN'
))
->
duringAdd
(
$retryQueueEntry
);
}
function
it_should_delete
(
RetryQueueEntry
$retryQueueEntry
)
{
$retryQueueEntry
->
getEntityUrn
()
->
shouldBeCalled
()
->
willReturn
(
'urn:test:123456'
);
$this
->
db
->
request
(
Argument
::
that
(
function
(
Custom
$prepared
)
{
$query
=
$prepared
->
build
();
return
stripos
(
$query
[
'string'
],
'DELETE FROM search_dispatcher_queue'
)
===
0
;
}),
true
)
->
shouldBeCalled
()
->
willReturn
(
true
);
$this
->
delete
(
$retryQueueEntry
)
->
shouldNotReturn
(
false
);
}
function
it_should_throw_during_delete_if_no_entity_urn
(
RetryQueueEntry
$retryQueueEntry
)
{
$retryQueueEntry
->
getEntityUrn
()
->
shouldBeCalled
()
->
willReturn
(
null
);
$this
->
db
->
request
(
Argument
::
cetera
())
->
shouldNotBeCalled
();
$this
->
shouldThrow
(
new
Exception
(
'Missing URN'
))
->
duringDelete
(
$retryQueueEntry
);
}
}
This diff is collapsed.
Click to expand it.