Our example program builds from the simple transactional application you created in the Transactional Application chapter and configures write forwarding. The application is network-aware, so you can specify things like host names and ports from the command line. This program has additional error handling for replication errors.
When using replication with write forwarding, there are several benefits for your application code:
You do not need to create an event handler to detect changes of the master.
You do not need to use app_data to track whether the current site is master.
You do not need to provide an error for put operations on the client.
Our program begins with some additional include statements:
/* * File: rep_mgr_wrfor_gsg.c */ include <stdlib.h> include <string.h> include <errno.h> ifdef _WIN32 include <windows.h> define sleep(s) Sleep(1000 * (s)) else /* !_WIN32 */ include <unistd.h> endif #include <db.h> #ifdef _WIN32 extern int getopt(int, char * const *, const char *); #endif
We then define a few values. One is the size of our cache, which we keep deliberately small for this example, and the other is the name of our database. Also, you can define a sleep time, which sets the time that a site waits before it retries synchronizing with the master. We also provide a global variable that is the name of our program; this is used for error reporting later on.
#define CACHESIZE (10 * 1024 * 1024) #define DATABASE "quote.db" #define SLEEPTIME 3 const char *progname = "ex_rep_gsg_wrfor";
Then we perform a couple of forward declarations. The first
of these, create_env()
and
env_init()
are used to open and
initialize our environment.
Next we declare doloop()
. This function
now takes an int
as an argument, which we
describe later.
Finally, we have print_stocks
function.
int create_env(const char *, DB_ENV **); int env_init(DB_ENV *, const char *); int doloop (DB_ENV *, int); int print_stocks(DB *);
Next we need our usage()
function, which
has additional options:
/* Usage function */ static void usage() { fprintf(stderr, "usage: %s ", progname); fprintf(stderr, "-h home -l|-L host:port [-r host:port]\n"); fprintf(stderr, "where:\n"); fprintf(stderr, "\t-h identifies the environment home directory "); fprintf(stderr, "(required).\n"); fprintf(stderr, "\t-l identifies the host and port used by this "); fprintf(stderr, "site (required unless L is specified).\n"); fprintf(stderr, "\t-L identifies the local site as group creator. \n"); fprintf(stderr, "\t-r identifies another site participating in "); fprintf(stderr, "this replication group\n"); exit(EXIT_FAILURE); }
where:
-h
Identifies the environment home directory. You must specify this option.
-l
Identifies the host and port used by this site. You
must specify this option unless -L
is
specified.
-L
Identifies the local site as group creator. You must
specify this option unless -l
is
specified.
-r
Optionally identifies another site participating in this replication group.
That completed, we can jump into our application's
main()
function. We begin by declaring
and initializing some variables used to collect
TCP/IP host and port information. We also declare a couple of
flags that we use to make sure some required information
is provided to this application:
int main(int argc, char *argv[]) { extern char *optarg; DB_ENV *dbenv; const char *home; char ch, *host, *last_colon, *portstr; int is_group_creator, local_is_set, ret; u_int16_t port; dbenv = NULL; ret = is_group_creator = local_is_set = 0; home = NULL;
Now we create and configure our environment handle. We do
this with our create_env()
function, which
we will show later in this example.
if ((ret = create_env(progname, &dbenv)) != 0) goto err;
Then we parse the command line arguments:
while ((ch = getopt(argc, argv, "h:l:L:r:")) != EOF) switch (ch) { case 'h': home = optarg; break; case 'L': is_group_creator = 1; /* FALLTHROUGH */ case 'l': host = optarg; /* * The final colon in host:port string is the * boundary between the host and the port portions * of the string. */ if ((last_colon = strrchr(host, ':')) == NULL ) { fprintf(stderr, "Bad local host specification.\n"); goto err; } /* * Separate the host and port portions of the * string for further processing. */ portstr = last_colon + 1; *last_colon = '\0'; port = (unsigned short)atoi(portstr); if ((ret = dbenv->repmgr_site(dbenv, host, port, &dbsite, 0)) != 0){ fprintf(stderr, "Could not set local address %s:%d.\n", host, port); goto err; } dbsite->set_config(dbsite, DB_LOCAL_SITE, 1); if (is_group_creator) dbsite->set_config(dbsite, DB_GROUP_CREATOR, 1); if ((ret = dbsite->close(dbsite)) != 0) { dbenv->err(dbenv, ret, "DB_SITE->close"); goto err; } local_is_set = 1; break; /* Identify another site in the replication group. */ case 'r': host = optarg; /* * The final colon in host:port string is the * boundary between the host and the port portions * of the string. */ if ((last_colon = strrchr(host, ':')) == NULL ) { fprintf(stderr, "Bad remote host specification.\n"); goto err; } /* * Separate the host and port portions of the * string for further processing. */ portstr = last_colon + 1; *last_colon = '\0'; port = (unsigned short)atoi(portstr); if ((ret = dbenv->repmgr_site(dbenv, host, port, &dbsite, 0)) != 0) { dbenv->err(dbenv, ret, "DB_ENV->repmgr_site"); goto err; } dbsite->set_config(dbsite, DB_BOOTSTRAP_HELPER, 1); if ((ret = dbsite->close(dbsite)) != 0) { dbenv->err(dbenv, ret, "DB_SITE->close"); goto err; } break; case '?': default: usage(); }
Now we can open our environment. We do this with the
env_init()
function which we will
describe a little later in this chapter.
if ((ret = env_init(dbenv, home)) != 0) goto err;
You can now configure and start Replication Manager with
write forwarding. To configure write forwarding, use
rep_set_config
with the
DB_REPMGR_CONF_FORWARD_WRITES
option.
dbenv->rep_set_config(dbenv, DB_REPMGR_CONF_FORWARD_WRITES, 1);
if ((ret = dbenv->repmgr_start(dbenv, 3, DB_REP_ELECTION)) != 0) goto err;
Now that we have opened the environment and configured and
started Replication Manager with write forwarding, we can call
our doloop()
function.
if ((ret = doloop(dbenv, is_group_creator)) != 0) { dbenv->err(dbenv, ret, "Application failed"); goto err; }
Finally, we provide our application shutdown code. Note,
again, that in a traditional transactional application all
databases would be closed here. In our replicated application,
the database will usually be closed in the doloop()
function, but we also conditionally close the database here to handle
some error cases.
err: if (dbenv != NULL) (void)dbenv->close(dbenv, 0); return (ret); }
Having written our main()
function, we
now implement the usual create_env
function.
int create_env(const char *progname, DB_ENV **dbenvp) { DB_ENV *dbenv; int ret; if ((ret = db_env_create(&dbenv, 0)) != 0) { fprintf(stderr, "can't create env handle: %s\n", db_strerror(ret)); return (ret); } dbenv->set_errfile(dbenv, stderr); dbenv->set_errpfx(dbenv, progname); *dbenvp = dbenv; return (0); }
Now we implement the env_init
function,
which also uses the DB_INIT_REP
and
DB_THREAD
flags that are required for
Replication Manager.
int env_init(DB_ENV *dbenv, const char *home) { u_int32_t flags; int ret; (void)dbenv->set_cachesize(dbenv, 0, CACHESIZE, 0); (void)dbenv->set_flags(dbenv, DB_TXN_NOSYNC, 1); /* DB_INIT_REP and DB_THREAD are required for Replication Manager. */ flags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_REP | DB_INIT_TXN | DB_RECOVER | DB_THREAD; if ((ret = dbenv->open(dbenv, home, flags, 0)) != 0) dbenv->err(dbenv, ret, "can't open environment"); return (ret); }
Having written our main()
function and
utility functions, we now implement doloop
.
This function provides a command prompt at which the user can
enter a stock ticker value and a price for that value. This
information is then entered to the database.
To display the database, simply enter return
at the prompt.
To begin, we declare a database pointer, several
DBT
variables, and the usual
assortment of variables used for buffers and return codes. We
also initialize all of this. Remember that doloop now takes
is_group_creator
as an additional
argument.
#define BUFSIZE 1024 int doloop(DB_ENV *dbenv, int is_group_creator) { DB *dbp; DBT key, data; char buf[BUFSIZE], *rbuf; int ret; u_int32_t db_flags; dbp = NULL; memset(&key, 0, sizeof(key)); memset(&data, 0, sizeof(data)); ret = 0;
Next, we begin the loop and we immediately open our database if it has not already been opened.
If -L
is set,
is_group_creator
specifies the need to
create the database for the initial group creator startup. The
database will be replicated to the other sites when they first
start up. The database will already exist on each site for
subsequent startups.
Note that there is some logic for a site to retry in case it
needs time to synchronize with the master using
SLEEPTIME
.
for (;;) { if (dbp == NULL) { if ((ret = db_create(&dbp, dbenv, 0)) != 0) return (ret); db_flags = DB_AUTO_COMMIT; /* * Only need to create the database for the * initial group creator startup. The database * will be replicated to the other sites when they * first start up. The database will already exist on * each site for subsequent startups. */ if (is_group_creator) db_flags |= DB_CREATE; if ((ret = dbp->open(dbp, NULL, DATABASE, NULL, DB_BTREE, db_flags, 0)) != 0) { /* Retry in case site needs time to synchronize with master. */ if (ret == ENOENT) { printf( "No stock database yet available.\n"); if ((ret = dbp->close(dbp, 0)) != 0) { dbenv->err(dbenv, ret, "DB->close"); goto err; } dbp = NULL; sleep(SLEEPTIME); continue; } dbenv->err(dbenv, ret, "DB->open"); goto err; }
Now we implement our command prompt. If the user
enters the keywords exit
or
quit
, the loop is exited and the
application ends. If the user enters nothing and instead simply
presses return
, the entire contents of the
database is displayed. We use our
print_stocks()
function to display the
database. (That implementation is shown next in this chapter.)
We also now check for a dead replication handle, which can occur in rare cases when a new master causes a previously committed transaction to be rolled back. In such cases, all database handles must be closed and opened again.
Remember that very little error checking is performed on the data entered at this prompt. If the user fails to enter at least one space in the value string, a simple help message is printed and the prompt is returned to the user.
printf("QUOTESERVER> "); fflush(stdout); if (fgets(buf, sizeof(buf), stdin) == NULL) break; if (strtok(&buf[0], " \t\n") == NULL) { switch ((ret = print_stocks(dbp))) { case 0: continue; case DB_REP_HANDLE_DEAD: /* Must close and reopen the handle, then can retry. */ (void)dbp->close(dbp, 0); dbp = NULL; dbenv->errx(dbenv, "Could not traverse data, retry operation"); continue; default: dbp->err(dbp, ret, "Error traversing data"); goto err; } } rbuf = strtok(NULL, " \t\n"); if (rbuf == NULL || rbuf[0] == '\0') { if (strncmp(buf, "exit", 4) == 0 || strncmp(buf, "quit", 4) == 0) break; dbenv->errx(dbenv, "Format: TICKER VALUE"); continue; }
Now we assign data to the DBT
s that we
will use to write the new information to the database.
key.data = buf; key.size = (u_int32_t)strlen(buf); data.data = rbuf; data.size = (u_int32_t)strlen(rbuf);
Having done that, we can write the new information to the database. Here, the reason we do not need an explicit commit on this put operation is that it uses the implicit NULL txnid, so each one is automatically committed. Also, the application retries if a deadlock, timeout or permission error occurs. A forwarded put operation can return a timeout error if the operation takes too long and a permission error if there is currently no master.
if ((ret = dbp->put(dbp, NULL, &key, &data, 0)) != 0) { dbp->err(dbp, ret, "DB->put"); switch (ret) { case DB_REP_HANDLE_DEAD: /* Must close and reopen the handle, then can retry. */ (void)dbp->close(dbp, 0); dbp = NULL; /* FALLTHROUGH */ case DB_LOCK_DEADLOCK: case DB_TIMEOUT: case EACCES: dbenv->errx(dbenv, "Could not update data, retry operation"); continue; default: dbp->err(dbp, ret, "Error updating data"); goto err; } }
Finally, we close our database before returning from the function.
err: if (dbp != NULL) (void)dbp->close(dbp, 0); return (ret); }
This function is unmodified from when we originally introduced it. For details on that function, see Function: print_stocks() .