Skip to content

Synchronous standbys #46

New issue

Have a question about this project? Sign up for a free account to open an issue and contact its maintainers and the community.

By clicking “Sign up for ”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on ? Sign in to your account

Merged
merged 8 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/source/testgres.rst
Original file line numberDiff line numberDiff line change
Expand Up@@ -61,6 +61,14 @@ testgres.node
.. autoclass:: testgres.node.ProcessProxy
:members:

testgres.standby
----------------

.. automodule:: testgres.standby
:members:
:undoc-members:
:show-inheritance:

testgres.pubsub
---------------

Expand Down
4 changes: 4 additions & 0 deletions testgres/__init__.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -24,3 +24,7 @@
get_bin_path, \
get_pg_config, \
get_pg_version

from .standby import \
First, \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO those names are too generic to be exported at top level.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think names are ok but should be used with the of module, like standby.First

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ildus I agree.

Any
44 changes: 43 additions & 1 deletion testgres/node.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,6 +6,7 @@
import subprocess
import time

from collections import Iterable
from shutil import rmtree
from six import raise_from, iteritems
from tempfile import mkstemp, mkdtemp
Expand DownExpand Up@@ -65,6 +66,8 @@

from .pubsub import Publication, Subscription

from .standby import First

from .utils import \
PgVer, \
eprint, \
Expand DownExpand Up@@ -671,7 +674,7 @@ def restart(self, params=[]):

def reload(self, params=[]):
"""
Reload config files using pg_ctl.
Asynchronously reload config files using pg_ctl.

Args:
params: additional arguments for pg_ctl.
Expand DownExpand Up@@ -1036,6 +1039,45 @@ def replicate(self, name=None, slot=None, **kwargs):
with clean_on_error(self.backup(**kwargs)) as backup:
return backup.spawn_replica(name=name, destroy=True, slot=slot)

def set_synchronous_standbys(self, standbys):
"""
Set standby synchronization options. This corresponds to
`synchronous_standby_names <https://www.postgresql.org/docs/current/static/runtime-config-replication.html#GUC-SYNCHRONOUS-STANDBY-NAMES>`_
option. Note that :meth:`~.PostgresNode.reload` or
:meth:`~.PostgresNode.restart` is needed for changes to take place.

Args:
standbys: either :class:`.First` or :class:`.Any` object specifying
sychronization parameters or just a plain list of
:class:`.PostgresNode`s replicas which would be equivalent
to passing ``First(1, <list>)``. For PostgreSQL 9.5 and below
it is only possible to specify a plain list of standbys as
`FIRST` and `ANY` keywords aren't supported.

Example::

from testgres import get_new_node, First

master = get_new_node().init().start()
with master.replicate().start() as standby:
master.append_conf("synchronous_commit = remote_apply")
master.set_synchronous_standbys(First(1, [standby]))
master.restart()

"""
if self._pg_version >= '9.6':
if isinstance(standbys, Iterable):
standbys = First(1, standbys)
else:
if isinstance(standbys, Iterable):
standbys = u", ".join(
u"\"{}\"".format(r.name) for r in standbys)
else:
raise TestgresException("Feature isn't supported in "
"Postgres 9.5 and below")

self.append_conf("synchronous_standby_names = '{}'".format(standbys))

def catchup(self, dbname=None, username=None):
"""
Wait until async replica catches up with its master.
Expand Down
49 changes: 49 additions & 0 deletions testgres/standby.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
# coding: utf-8

import six


@six.python_2_unicode_compatible
class First:
"""
Specifies a priority-based synchronous replication and makes transaction
commits wait until their WAL records are replicated to ``num_sync``
synchronous standbys chosen based on their priorities.

Args:
sync_num (int): the number of standbys that transaction need to wait
for replies from
standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby
nodes
"""

def __init__(self, sync_num, standbys):
self.sync_num = sync_num
self.standbys = standbys

def __str__(self):
return u"{} ({})".format(self.sync_num, u", ".join(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут мне кажется лучше добавить ключевое слово first, чтобы было явно

u"\"{}\"".format(r.name) for r in self.standbys))


@six.python_2_unicode_compatible
class Any:
"""
Specifies a quorum-based synchronous replication and makes transaction
commits wait until their WAL records are replicated to at least ``num_sync``
listed standbys. Only available for Postgres 10 and newer.

Args:
sync_num (int): the number of standbys that transaction need to wait
for replies from
standbys (:obj:`list` of :class:`.PostgresNode`): the list of standby
nodes
"""

def __init__(self, sync_num, standbys):
self.sync_num = sync_num
self.standbys = standbys

def __str__(self):
return u"ANY {} ({})".format(self.sync_num, u", ".join(
u"\"{}\"".format(r.name) for r in self.standbys))
44 changes: 44 additions & 0 deletions tests/test_simple.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -40,6 +40,10 @@
get_pg_config, \
get_pg_version

from testgres import \
First, \
Any

# NOTE: those are ugly imports
from testgres import bound_ports
from testgres.utils import PgVer
Expand DownExpand Up@@ -409,6 +413,46 @@ def test_replicate(self):
res = node.execute('select * from test')
self.assertListEqual(res, [])

def test_synchronous_replication(self):
with get_new_node() as master:
old_version = not pg_version_ge('9.6')

master.init(allow_=True).start()

if not old_version:
master.append_conf('synchronous_commit = remote_apply')

# create standby
with master.replicate() as standby1, master.replicate() as standby2:
standby1.start()
standby2.start()

# check formatting
self.assertEqual(
'1 ("{}", "{}")'.format(standby1.name, standby2.name),
str(First(1, (standby1, standby2)))) # yapf: disable
self.assertEqual(
'ANY 1 ("{}", "{}")'.format(standby1.name, standby2.name),
str(Any(1, (standby1, standby2)))) # yapf: disable

# set synchronous_standby_names
master.set_synchronous_standbys([standby1, standby2])
master.restart()

# the following part of the test is only applicable to newer
# versions of PostgresQL
if not old_version:
master.safe_psql('create table abc(a int)')

# Create a large transaction that will take some time to apply
# on standby to check that it applies synchronously
# (If set synchronous_commit to 'on' or other lower level then
# standby most likely won't catchup so fast and test will fail)
master.safe_psql(
'insert into abc select generate_series(1, 1000000)')
res = standby1.safe_psql('select count(*) from abc')
self.assertEqual(res, b'1000000\n')

@unittest.skipUnless(pg_version_ge('10'), 'requires 10+')
def test_logical_replication(self):
with get_new_node() as node1, get_new_node() as node2:
Expand Down