Our example program builds from the simple transactional application you created in the Transactional Application chapter and configures write forwarding. The application is network-aware, so you can specify things like host names and ports from the command line. This program has additional error handling for replication errors.
When using replication with write forwarding, there are several benefits for your application code:
You do not need to create an event handler to detect changes of the master.
You do not need to use app_data to track whether the current site is master.
You do not need to provide an error for put operations on the client.
Before we begin, we present a class that we will use to maintain useful information for us.
The class that we create is called
RepConfigInfo
.
#include <db_cxx.h> class RepConfigInfo { public: RepConfigInfo(); virtual ~RepConfigInfo(); void addOtherHost(char* host, int port); public: u_int32_t start_policy; char* home; bool got_listen_address; REP_HOST_INFO this_host; int nrsites; // used to store a set of optional other hosts. REP_HOST_INFO *other_hosts; }; RepConfigInfo::RepConfigInfo() { start_policy = DB_REP_ELECTION; home = "TESTDIR"; got_listen_address = false; nrsites = 0; other_hosts = NULL; } RepConfigInfo::~RepConfigInfo() { // release any other_hosts structs. if (other_hosts != NULL) { REP_HOST_INFO *CurItem = other_hosts; while (CurItem->next != NULL) { REP_HOST_INFO *TmpItem = CurItem->next; free(CurItem); CurItem = TmpItem; } free(CurItem); } other_hosts = NULL; }
Our example will
instantiate a class,
RepMgrWrforGSG
, that performs
all our work for us. Before we implement our
main()
function, we show
the RepMgrWrforGSG
class
declaration.
First, we provide some declarations and definitions that are needed later in our example. One is the size of our cache, which we keep deliberately small for this example, and the other is the name of our database. Also, you can define a sleep time, which sets the time that a site waits before it retries synchronizing with the master. We also provide a global variable that is the name of our program; this is used for error reporting later on.
#include <iostream> #include <errno.h> #include <db_cxx.h> #include "RepWrforConfigInfo.h" using std::cout; using std::cin; using std::cerr; using std::endl; using std::flush; #define CACHESIZE (10 * 1024 * 1024) #define DATABASE "quote.db" #define SLEEPTIME 3 const char *progname = "excxx_repquote_gsg_wrfor"; #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #include <windows.h> #include <direct.h> #define sleep(s) Sleep(1000 * (s)) extern "C" { extern int getopt(int, char * const *, const char *); extern char *optarg; } #endif
And then we define our RepMgrWrforGSG
class:
class RepMgrWrforGSG { public: // Constructor. RepMgrWrforGSG(); // Initialization method. Creates and opens our environment handle. int init(RepConfigInfo* config); // The doloop is where all the work is performed. int doloop(); // terminate() provides our shutdown code. int terminate(); private: // disable copy constructor. RepMgrWrforGSG(const RepMgrWrforGSG &); void operator = (const RepMgrWrforGSG &); // internal data members. RepConfigInfo *app_config; DbEnv dbenv; // private methods. // print_stocks() is used to display the contents of our database. static int print_stocks(Db *dbp); };
Note that we show the implementation of the various
RepMgrWrforGSG
methods later in this section.
Our usage()
function
manages the following command line arguments:
static void usage() { cerr << "usage: " << progname << endl << "-h home -l|-L host:port [-r host:port]" << endl; cerr << "\t -h home directory (required)" << endl << "\t -l host:port (required unless -L is specified;" << "\t l stands for local)" << endl << "\t -L host:port (optional, L means group creator)" << endl << "\t -r host:port (optional; r stands for remote; any " << "number of these" << endl << "\t may be specified)" << endl; exit(EXIT_FAILURE); }
where:
-h
Identifies the environment home directory. You must specify this option.
-l
Identifies the host and port used by this site. You
must specify this option unless -L
is
specified.
-L
Identifies the local site as group creator. You must
specify this option unless -l
is
specified.
-r
Optionally identifies another site participating in this replication group.
Now we provide our main()
function. This is a trivial function whose only
job is to collect command line information,
then instantiate a RepMgrWrforGSG
object, run it, then terminate it.
We begin by declaring some useful variables. Of
these, note that we instantiate our
RepConfigInfo
object here. Recall that this is used to store
information useful to our code.
int main(int argc, char **argv) { RepConfigInfo config; char ch, *last_colon, *portstr, *tmphost; int tmpport; int ret;
Then we collect our command line information:
// Extract the command line parameters while ((ch = getopt(argc, argv, "h:l:L:r:")) != EOF) { switch (ch) { case 'h': config.home = optarg; break; case 'L': config.this_host.creator = true; // FALLTHROUGH case 'l': config.this_host.host = optarg; /* * The final colon in host:port string is the * boundary between the host and the port portions * of the string. */ if ((last_colon = strrchr(optarg, ':')) == NULL) { cerr << "Bad local host specification." << endl; usage(); } /* * Separate the host and port portions of the * string for further processing. */ portstr = last_colon + 1; *last_colon = '\0'; config.this_host.port = (unsigned short)atoi(portstr); config.got_listen_address = true; break; case 'r': tmphost = optarg; /* * The final colon in host:port string is the * boundary between the host and the port portions * of the string. */ if ((last_colon = strrchr(tmphost, ':')) == NULL) { cerr << "Bad remote host specification." << endl; usage(); } /* * Separate the host and port portions of the * string for further processing. */ portstr = last_colon + 1; *last_colon = '\0'; tmpport = (unsigned short)atoi(portstr); config.addOtherHost(tmphost, tmpport); break; case '?': default: usage(); } } // Error check command line. if ((!config.got_listen_address) || config.home == NULL) usage();
Now we instantiate and initialize our RepMgrWrforGSG
class, which is what is responsible for doing all our real work.
The RepMgrWrforGSG::init()
method creates and
opens our environment handle.
RepMgrWrforGSG runner; try { if((ret = runner.init(&config)) != 0) goto err;
Then we call the RepMgrWrforGSG::doloop()
method, which is where the actual transactional work is
performed for this application.
if((ret = runner.doloop()) != 0) goto err;
Finally, catch exceptions and terminate the program. Note, again,
that in a traditional transactional application all databases would be closed here.
In our replicated application, the database will usually be closed in the
doloop()
function, but we also conditionally
close the database here to handle some error cases.
} catch (DbException dbe) { cerr << "Caught an exception during initialization or" << " processing: " << dbe.what() << endl; } err: runner.terminate(); return 0; }
The RepMgrWrforGSG::init()
method is used to create and open our environment handle.
First, we show the class constructor implementation, which is only used to initialize a few variables:
RepMgrWrforGSG::RepMgrWrforGSG() : app_config(0), dbenv((u_int32_t)0) { }
We now provide the init()
method
implementation. You can now configure and start Replication Manager with
write forwarding. To configure write forwarding, use
rep_set_config
with the
DB_REPMGR_CONF_FORWARD_WRITES
option:
int RepMgrWrforGSG::init(RepConfigInfo *config) { int ret = 0; app_config = config; dbenv.set_errfile(stderr); dbenv.set_errpfx(progname); DbSite *dbsite; dbenv.repmgr_site(app_config->this_host.host, app_config->this_host.port, &dbsite, 0); dbsite->set_config(DB_LOCAL_SITE, 1); if (app_config->this_host.creator) dbsite->set_config(DB_GROUP_CREATOR, 1); dbsite->close(); int i = 1; for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL && i <= app_config->nrsites; cur = cur->next, i++) { dbenv.repmgr_site(cur->host, cur->port, &dbsite, 0); dbsite->set_config(DB_BOOTSTRAP_HELPER, 1); dbsite->close(); } // We can now open our environment, although we're not ready to // begin replicating. However, we want to have a dbenv around // so that we can send it into any of our message handlers. dbenv.set_cachesize(0, CACHESIZE, 0); dbenv.set_flags(DB_TXN_NOSYNC, 1); try { dbenv.open(app_config->home, DB_CREATE | DB_RECOVER | DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN, 0); } catch(DbException dbe) { cerr << "Caught an exception during DB environment open." << endl << "Ensure that the home directory is created prior" << "to starting the application." << endl; ret = ENOENT; goto err; } /* Configure Replication Manager write forwarding. */ dbenv.rep_set_config(DB_REPMGR_CONF_FORWARD_WRITES, 1); if ((ret = dbenv.repmgr_start(3, app_config->start_policy)) != 0) goto err; err: return ret; }
Finally, we present the RepMgrWrforGSG::terminate()
method here. All this does is close the environment handle.
int RepMgrWrforGSG::terminate() { try { dbenv.close(0); } catch (DbException dbe) { cerr << "error closing environment: " << dbe.what() << endl; } return 0; }
Having written our main()
function and support utility methods, we now implement
our application's
primary data processing method. This
method provides a command prompt at which the
user can enter a stock ticker value and a price for
that value. This information is then entered to the
database.
To display the database, simply enter
return
at the prompt.
To begin, we declare a database pointer,
several Dbt
variables, and
the usual assortment of variables used for buffers
and return codes. We also initialize all of this.
#define BUFSIZE 1024 int RepMgrWrforGSG::doloop() { Dbt key, data; Db *dbp; char buf[BUFSIZE], *rbuf; int ret; dbp = 0; memset(&key, 0, sizeof(key)); memset(&data, 0, sizeof(data)); ret = 0;
Next, we begin the loop and we immediately open our database if it has not already been opened.
If -L
is set, it specifies the need to
create the database for the initial group creator startup. The
database will be replicated to the other sites when they first
start up. The database will already exist on each site for
subsequent startups.
Note that there is some logic for a site to retry in case it
needs time to synchronize with the master using
SLEEPTIME
.
for (;;) { if (dbp == 0) { dbp = new Db(&dbenv, 0); try { dbp->open(NULL, DATABASE, NULL, DB_BTREE, app_config-> this_host.creator ? DB_CREATE | DB_AUTO_COMMIT : DB_AUTO_COMMIT, 0); } catch(DbException dbe) { // It is expected that this condition will // be triggered when client sites start up. It can // take a while for the master site to be found // and synced, and no DB will be available until then. if (dbe.get_errno() == ENOENT) { cout << "No stock db available yet - retrying." << endl; try { dbp->close(0); } catch (DbException dbe2) { cout << "Unexpected error closing after failed" << " open, message: " << dbe2.what() << endl; dbp = NULL; goto err; } dbp = NULL; sleep(SLEEPTIME); continue; } else { dbenv.err(ret, "DB->open"); throw dbe; } } }
Now we implement our command prompt.
If the user enters the keywords exit
or quit
, the loop is exited and the
application ends. If the user enters nothing and instead simply
presses return
, the entire contents of the
database is displayed. We use our
print_stocks()
method to display the
database. (That implementation is shown next in this chapter.)
We also now check for a dead replication handle, which can occur in rare cases when a new master causes a previously committed transaction to be rolled back. In such cases, all database handles must be closed and opened again.
Remember that very little error checking is performed on the data entered at this prompt. If the user fails to enter at least one space in the value string, a simple help message is printed and the prompt is returned to the user.
cout << "QUOTESERVER" ; cout << "> " << flush; if (fgets(buf, sizeof(buf), stdin) == NULL) break; if (strtok(&buf[0], " \t\n") == NULL) { switch ((ret = print_stocks(dbp))) { case 0: continue; case DB_REP_HANDLE_DEAD: (void)dbp->close(DB_NOSYNC); cout << "closing db handle due to rep handle dead" << endl; dbp = NULL; continue; default: dbp->err(ret, "Error traversing data"); goto err; } } rbuf = strtok(NULL, " \t\n"); if (rbuf == NULL || rbuf[0] == '\0') { if (strncmp(buf, "exit", 4) == 0 || strncmp(buf, "quit", 4) == 0) break; dbenv.errx("Format: TICKER VALUE"); continue; }
Now we assign data to the Dbt
s that
we will use to write the new information to the database.
key.set_data(buf); key.set_size((u_int32_t)strlen(buf)); data.set_data(rbuf); data.set_size((u_int32_t)strlen(rbuf));
Having done that, we can write the new information to the database. Here, the reason we do not need an explicit commit on this put operation is that it uses the implicit NULL txnid, so each one is automatically committed. Also, the application retries if a deadlock, timeout or permission error occurs. A forwarded put operation can return a timeout error if the operation takes too long and a permission error if there is currently no master.
if ((ret = dbp->put(NULL, &key, &data, 0)) != 0) { dbp->err(ret, "DB->put"); switch (ret) { case DB_REP_HANDLE_DEAD: /* Must close and reopen the handle, then can retry. */ (void)dbp->close(0); dbp = NULL; /* FALLTHROUGH */ case DB_LOCK_DEADLOCK: case DB_TIMEOUT: case EACCES: dbenv.errx("Could not update data, retry operation"); case DB_KEYEXIST: continue; default: dbp->err(ret, "Error updating data"); goto err; } } }
Finally, we close our database before returning from the method.
err: if (dbp != NULL) { (void)dbp->close(DB_NOSYNC); cout << "database closed" << endl; } return (ret); }
This function is unmodified from when we originally introduced it. For details on that function, see Method: SimpleTxn::print_stocks() .