CouchDB style sync and conflict resolution on Postgres with Hasura

We’ve been talking about offline-first with Hasura and RxDB (essentially Postgres and PouchDB underneath).

This post continues to dive deeper into the topic. It is a discussion and guide to implementing CouchDB style conflict resolution with Postgres (central backend database) and PouchDB (frontend app user database).

Here's what we are going to talk about:

Let us take a Trello board as an example. Let's say you've changed the assignee on a Trello card while offline. In the meanwhile your colleague edits the description of the same card. When you come back online you would want to see both the changes. Now suppose both of you changed the description at the same time, what should happen in this case? One option is to simply take the last write - that is override the earlier change with the new one. Another is to notify the user and let them update the card with a merged field (like git!).

This aspect of taking multiple simultaneous changes (which may be conflicting), and merging them into one change is called conflict resolution.

Replication and conflict resolution infrastructure is painful to build into the frontend and backend of an application. But once it is setup, some important use-cases become viable! In fact, for certain kinds of applications replication (and hence conflict resolution) are critical to the app's functionality!

  1. Realtime: Changes made by the users on different devices are synced with each other
  2. Collaborative: Different users simultaneously work on the same data
  3. Offline-first: The same user can work with their data even while the app is not connected to the central database

Examples: Trello, Email clients like Gmail, Superhuman, Google docs, Facebook, Twitter etc.

Hasura makes it super easy to add high-performance, secure, realtime capabilities to your existing Postgres based application. There is no need to deploy additional backend infrastructure for supporting these use-cases! In the next few sections we'll learn how you can use PouchDB/RxDB on the frontend and pair it with Hasura to build powerful apps with great user-experience.

Version management with PouchDB

PouchDB - which RxDB uses underneath - comes with a powerful versioning and conflict management mechanism. Every document in PouchDB has a version field associated with it. Version fields are of the form <depth>-<object-hash> for example 2-c1592ce7b31cc26e91d2f2029c57e621. Here depth indicates the depth in the revision tree. Object hash is a randomly generated string.

A sneak peek into PouchDB revisions

PouchDB exposes APIs to fetch the revision history of a document. We can query the revision history this way:

todos.pouch.get(todo.id, {
    revs: true
})

This will return a document containing a _revisions field:

{
  "id": "559da26d-ad0f-42bc-a172-1821641bf2bb",
  "_rev": "4-95162faab173d1e748952179e0db1a53",
  "_revisions": {
    "ids": [
      "95162faab173d1e748952179e0db1a53",
      "94162faab173d1e748952179e0db1a53",
      "9055e63d99db056a95b61936f0185c8c",
      "de71900ec14567088bed5914b2439896"
    ],
    "start": 4
  }
}

Here ids contains hierarchy of revisions of revisions (including the current) and start contains the "prefix number" for the current revision. Every time a new revision is added start is incremented and a new hash is added to the start of the ids array.

When a document is synced to a remote server, _revisions and _rev fields need to be included. This way all clients eventually have the complete version history. This happens automatically when PouchDB is set up to sync with CouchDB. The above pull request enables this when syncing via GraphQL as well.

Note that all clients do not necessarily have all the revisions, but all of them will eventually have the latest versions and the history of the revision ids for these versions.

Conflict resolution

A conflict will be detected if two revisions have the same parent or more simply if any two revisions have the same depth. When a conflict is detected, CouchDB & PouchDB will use the same algorithm to auto pick a winner:

  1. Select revisions with the highest depth field that are not marked as deleted
  2. If there is only 1 such field, treat it as the winner
  3. If there are more than 1, sort the revision fields in descending order and pick the first.

A note about deletion: PouchDB & CouchDB never delete revisions or documents instead a new revision is created with a _deleted flag set to true. So in step 1 of the above algorithm any chains that end with a revision marked as deleted are ignored.

One nice feature of this algorithm is that there is no coordination required between clients or the client and the server to resolve a conflict. There is no additional marker required to mark a version as winning either. Each client and the server independently pick the winner. But the winner will be the same revision because they use the same deterministic algorithm. Even if one of the clients has some revisions missing, eventually when those revisions are synced, the same revision gets picked as the winner.

Implementing custom conflict resolution strategies

But what if we want an alternate conflict resolution strategy? For example "merge by fields" - If two conflicting revisions have modified different keys of the object we want to auto merge by creating a revision with both the keys. The recommended way of doing this in PouchDB is to:

  1. Create this new revision on any of the chains
  2. Add a revision with _deleted set to true to each of the other chains

The merged revision will now automatically be the winning revision according to the above algorithm. We can do custom resolution either on the server or the client. When the revisions get synced all clients and the server will see the merged revision as the winning revision.

To implement the above conflict resolution strategy, we will need Hasura to also store the revision history and for RxDB to sync revisions while replicating using GraphQL.

Continuing with the Todo app example from the previous post. We will have to update the schema for the Todos table as follows:

todo (
  id: text primary key,
  userId: text,
  text: text,
  createdAt: timestamp,
  isCompleted: boolean,
  deleted: boolean,
  updatedAt: boolean,
  _revisions: jsonb,
  _rev: text primary key,
  _parent_rev: text,
  _depth: integer,
)

Note the additional fields:

  • _rev represents the revision of the record.
  • _parent_rev represents the parent revision of the record
  • _depth is the depth of the record in the revision tree
  • _revisions contains the complete history of revisions of the record.

The primary key for the table is (id, _rev).

Strictly speaking we only need the _revisions field since the other information can be derived from it. But having the other fields readily available makes conflict detection & resolution easier.

We need to set syncRevisions to true while setting up replication


    async setupGraphQLReplication(auth) {
        const replicationState = this.db.todos.syncGraphQL({
            url: syncURL,
            headers: {
                'Authorization': `Bearer ${auth.idToken}`
            },
            push: {
                batchSize,
                queryBuilder: pushQueryBuilder
            },
            pull: {
                queryBuilder: pullQueryBuilder(auth.userId)
            },

            live: true,
         
            liveInterval: 1000 * 60 * 10,
            deletedFlag: 'deleted',
            syncRevisions: true,
        });

       ...
    }

We also need to add a text field last_pulled_rev to RxDB schema. This field is used internally by the plugin to avoid pushing revisions fetched from the server back to the server.

const todoSchema = {
    ...
    'properties': {
        ...
        'last_pulled_rev': {
            'type': 'string'
        }
    },
    ...
};

Finally, we need to change the pull & push query builders to sync revision related information

Pull Query Builder

const pullQueryBuilder = (userId) => {
    return (doc) => {
        if (!doc) {
            doc = {
                id: '',
                updatedAt: new Date(0).toUTCString()
            };
        }

        const query = `{
            todos(
                where: {
                    _or: [
                        {updatedAt: {_gt: "${doc.updatedAt}"}},
                        {
                            updatedAt: {_eq: "${doc.updatedAt}"},
                            id: {_gt: "${doc.id}"}
                        }
                    ],
                    userId: {_eq: "${userId}"} 
                },
                limit: ${batchSize},
                order_by: [{updatedAt: asc}, {id: asc}]
            ) {
                id
                text
                isCompleted
                deleted
                createdAt
                updatedAt
                userId
                _rev
                _revisions
            }
        }`;
        return {
            query,
            variables: {}
        };
    };
};

We now fetch the _rev & _revisions fields. The upgraded plugin will use these fields to create local PouchDB revisions.

Push Query Builder


const pushQueryBuilder = doc => {
    const query = `
        mutation InsertTodo($todo: [todos_insert_input!]!) {
            insert_todos(objects: $todo){
                returning {
                  id
                }
            }
       }
    `;

    const depth = doc._revisions.start;
    const parent_rev = depth == 1 ? null : `${depth - 1}-${doc._revisions.ids[1]}`

    const todo = Object.assign({}, doc, {
        _depth: depth,
        _parent_rev: parent_rev
    })

    delete todo['updatedAt']

    const variables = {
        todo: todo
    };

    return {
        query,
        variables
    };
};

With the upgraded plugin, the input parameter doc now contains _rev and _revisions fields. We pass on to Hasura in the GraphQL query. We add fields _depth, _parent_rev to doc before doing so.

Earlier we were using an upsert to insert or update a todo record on Hasura. Now since each version ends up being a new record we use the plain old insert mutation instead.

If two different clients now make conflicting changes then both the revisions will be synced and present in Hasura. Both the clients will also eventually receive the other revision. Because PouchDB's conflict resolution strategy is deterministic both the clients will then pick the same version as the "winning revision".

How can we find this winning revision on the server? We will have to implement the same algorithm in SQL.

Implementing CouchDB's conflict resolution algorithm on Postgres

Step 1: Finding leaf nodes not marked as deleted

To do this we need to ignore any versions that have a child revision and any versions that are marked as deleted:

    SELECT
        id,
        _rev,
        _depth
    FROM
        todos
    WHERE
        NOT EXISTS (
            SELECT
                id
            FROM
                todos AS t
            WHERE
                todos.id = t.id
                AND t._parent_rev = todos._rev)
            AND deleted = FALSE

Step 2: Finding the chain with the maximum depth

Assuming we have the results from the above query in a table (or view or a with clause) called leaves we can find the chain with maximum depth is straight forward:

    SELECT
        id,
        MAX(_depth) AS max_depth
    FROM
        leaves
    GROUP BY
        id

Step 3: Finding winning revisions among revisions with equal max depth

Again assuming the results from the above query are in a table (or  a view or a with clause) called max_depths we can find the winning revision as follows:

    SELECT
        leaves.id,
        MAX(leaves._rev) AS _rev
    FROM
        leaves
        JOIN max_depths ON leaves.id = max_depths.id
            AND leaves._depth = max_depths.max_depth
    GROUP BY
        leaves.id

Creating a view with winning revisions

Putting together the above three queries we can create a view that shows us the winning revisions as follows:

CREATE OR REPLACE VIEW todos_current_revisions AS
WITH leaves AS (
    SELECT
        id,
        _rev,
        _depth
    FROM
        todos
    WHERE
        NOT EXISTS (
            SELECT
                id
            FROM
                todos AS t
            WHERE
                todos.id = t.id
                AND t._parent_rev = todos._rev)
            AND deleted = FALSE
),
max_depths AS (
    SELECT
        id,
        MAX(_depth) AS max_depth
    FROM
        leaves
    GROUP BY
        id
),
winning_revisions AS (
    SELECT
        leaves.id,
        MAX(leaves._rev) AS _rev
    FROM
        leaves
        JOIN max_depths ON leaves.id = max_depths.id
            AND leaves._depth = max_depths.max_depth
    GROUP BY
        (leaves.id))
SELECT
    todos.*
FROM
    todos
    JOIN winning_revisions ON todos._rev = winning_revisions._rev;

Since Hasura can track views and allows querying them via GraphQL, the winning revisions can now be exposed to other clients and services.

Whenever you query the view, Postgres will simply replace the view with the query in the view definition and run the resulting query. If you query the view frequently this might end up leading to a lot of wasted CPU cycles. We can optimize this by using Postgres triggers and storing the winning revisions in a different table.

Using Postgres triggers to calculate winning revisions

Step 1: Create a new table todos_current_revisions

The schema will be the same as that of the  todos table. The primary key will, however, be the id column instead of (id, _rev)

Step 2: Create Postgres trigger

We can write the query for the trigger by starting with the view query. Since the trigger function will run for one row at a time, we can simplify the query:

CREATE OR REPLACE FUNCTION calculate_winning_revision ()
    RETURNS TRIGGER
    AS $BODY$
BEGIN
    INSERT INTO todos_current_revisions WITH leaves AS (
        SELECT
            id,
            _rev,
            _depth
        FROM
            todos
        WHERE
            NOT EXISTS (
                SELECT
                    id
                FROM
                    todos AS t
                WHERE
                    t.id = NEW.id
                    AND t._parent_rev = todos._rev)
                AND deleted = FALSE
                AND id = NEW.id
        ),
        max_depths AS (
            SELECT
                MAX(_depth) AS max_depth
            FROM
                leaves
        ),
        winning_revisions AS (
            SELECT
                MAX(leaves._rev) AS _rev
            FROM
                leaves
                JOIN max_depths ON leaves._depth = max_depths.max_depth
        )
        SELECT
            todos.*
        FROM
            todos
            JOIN winning_revisions ON todos._rev = winning_revisions._rev
    ON CONFLICT ON CONSTRAINT todos_winning_revisions_pkey
        DO UPDATE SET
            _rev = EXCLUDED._rev,
            _revisions = EXCLUDED._revisions,
            _parent_rev = EXCLUDED._parent_rev,
            _depth = EXCLUDED._depth,
            text = EXCLUDED.text,
            "updatedAt" = EXCLUDED."updatedAt",
            deleted = EXCLUDED.deleted,
            "userId" = EXCLUDED."userId",
            "createdAt" = EXCLUDED."createdAt",
            "isCompleted" = EXCLUDED."isCompleted";
    RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;

CREATE TRIGGER trigger_insert_todos
    AFTER INSERT ON todos
    FOR EACH ROW
    EXECUTE PROCEDURE calculate_winning_revision ()

That's it! We can now query the winning versions both on the server and the client.

Now let us look at implementing custom conflict resolution with Hasura & RxDB.

Custom conflict resolution on the server side

Let us say we want to merge the todos by fields. How do we go about doing this? The gist below shows us this:

That SQL looks like a lot but the only part that deals with the actual merge strategy is this:

CREATE OR REPLACE FUNCTION merge_revisions (item1 jsonb, item2 jsonb)
    RETURNS jsonb
    AS $$
BEGIN
    IF NOT item1 ? 'id' THEN
        RETURN item2;
    ELSE
        RETURN item1 || (item2 -> 'diff');
    END IF;
END;
$$
LANGUAGE plpgsql;

CREATE OR REPLACE AGGREGATE agg_merge_revisions (jsonb) (
    INITCOND = '{}',
    STYPE = jsonb,
    SFUNC = merge_revisions
);

Here we declare a custom Postgres aggregate function agg_merge_revisions to merge elements. The way this works is similar to a 'reduce' function: Postgres will initialize the aggregate value to '{}', then run the merge_revisions function with the current aggregate and the next element to be merged. So if we had 3 conflicting versions to be merged the result would be:

merge_revisions(merge_revisions(merge_revisions('{}', v1), v2), v3)

If we want to implement another strategy we will need to change the merge_revisions function. For example, if we want to implement the 'last write wins' strategy:

CREATE OR REPLACE FUNCTION merge_revisions (item1 jsonb, item2 jsonb)
    RETURNS jsonb
    AS $$
BEGIN
    IF NOT (item1 ? 'id') THEN
        RETURN item2;
    ELSE
        IF (item2 -> 'updatedAt') > (item1 -> 'updatedAt') THEN
            RETURN item2
        ELSE
            RETURN item1
        END IF;
    END IF;
END;
$$
LANGUAGE plpgsql;

The insert query in the above gist can be run in a post insert trigger to auto-merge conflicts whenever they occur.

Note: Above we have used SQL to implement custom conflict resolution. An alternate approach is to use a write an Action:

  1. Create a custom mutation to handle the insert instead of the default auto-generated insert mutation.
  2. In the action handler create the new revision of the record. We can use the Hasura insert mutation for this.
  3. Fetch all the revisions for the object using a list query
  4. Detect any conflicts by traversing the revision tree.
  5. Write back the merged version.

This approach will appeal to you if you would prefer writing this logic in a language other than SQL. Another approach is to create a SQL view to show the conflicting revisions and implement the remaining logic in the action handler. This will simplify step 4. above since we can now simply query the view for detecting conflicts.

Custom conflict resolution on the client side

There are scenarios where you need user intervention to be able to resolve a conflict. For example, if we were building something like the Trello app and two users modified the description of the same task, you might want to show the user both the versions and let them create a merged version. In these scenarios we will need to resolve the conflict on the client-side.

Client-side conflict resolution is simpler to implement because PouchDB already exposes API's to query conflicting revisions. If we look at the todos RxDB collection from the previous post, here is how we can fetch the conflicting versions:

todos.pouch.get(todo.id, {
    conflicts: true
})

The above query would populate the conflicting revisions in the _conflicts field in the result. We can then present these to the user for resolution.

PouchDB comes with a flexible and powerful construct for versioning and conflict management solution. This post showed us how to use these constructs with Hasura/Postgres. In this post we have focused on doing this using plpgsql. We will be doing a follow up post showing how to do this with Actions so that you can use the language of your choice on the backend!

Blog
28 Apr, 2020
Email
Subscribe to stay up-to-date on all things Hasura. One newsletter, once a month.
Loading...
v3-pattern
Accelerate development and data access with radically reduced complexity.