You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

This feature allows you to have incremental loading of memberships for SQL based loader jobs.  You can add database triggers onto your existing loader tables to populate a new table to indicate which users have been updated for which jobs.  Or you can add messages to a message queue and have Grouper look for changes there.  Then the incremental loader can process updates quickly just for those users.

If you use this with SQL_GROUP_LIST, the loader job must have grouperLoaderGroupsLike configured.  Also, for SQL_GROUP_LIST, if a new group doesn't exist in Grouper yet, the incremental loader will trigger a full sync.

Configuration

By default, multiple threads will be used to improve performance.  This is the default configuration but can be adjusted in grouper-loader.properties.

loader.incrementalThreads=true
loader.incrementalThreadPoolSize=10

 

In grouper-loader.properties, configure your incremental jobs.  You can have multiple jobs, which would mainly be relevant if you use multiple databases for your loader jobs.

otherJob.incrementalLoader1.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader1.quartzCron = 0 * * * * ?
otherJob.incrementalLoader1.databaseName=warehouse
otherJob.incrementalLoader1.tableName=myincrementaltable
 
otherJob.incrementalLoader2.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader2.quartzCron = 0 * * * * ?
otherJob.incrementalLoader2.databaseName=warehouse2
otherJob.incrementalLoader2.tableName=myincrementaltable

 

Database setup

Your incremental table should have the following columns:

  • id - primary key (e.g. auto incremental or a sequence)
  • subject_id - mutually exclusive with subject_identifier/subject_id_or_identifier
  • subject_identifier - mutually exclusive with subject_id/subject_id_or_identifier
  • subject_id_or_identifier - mutually exclusive with subject_id/subject_identifier
  • source_id - optional
  • loader_group_name - Grouper group name of a group defined as SQL_SIMPLE or SQL_GROUP_LIST.
  • timestamp - milliseconds.  Your trigger should populate this.
  • completed_timestamp - milliseconds.  The incremental loader will populate this when the row is processed.  After one day, the row will be deleted.
HSQLDB:
 
CREATE TABLE myincrementaltable
(
    id INTEGER NOT NULL,
    subject_id VARCHAR(255),
    subject_identifier VARCHAR(255),
    subject_id_or_identifier VARCHAR(255),
    source_id VARCHAR(255),
    loader_group_name VARCHAR(1024) NOT NULL,
    timestamp BIGINT NOT NULL,
    completed_timestamp BIGINT,
    PRIMARY KEY (id)
);

Messaging

If you are triggering updates via messaging instead of database triggers, the table above is still needed.  A builtin message listener will simply monitor your queue and add rows to the table.  Note that the "id" column has to be populated automatically on insert (i.e. use an auto increment type column or use a trigger to populate based on a sequence before the insert).

In your grouper-loader.properties file, add the following configuration for a new message listener:

messaging.listener.myCustomMessagingListener.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalMessagingListener
messaging.listener.myCustomMessagingListener.quartzCron = 0 * * * * ?
messaging.listener.myCustomMessagingListener.messagingSystemName = grouperBuiltinMessaging
messaging.listener.myCustomMessagingListener.queueName = abc
messaging.listener.myCustomMessagingListener.numberOfTriesPerIteration = 3
messaging.listener.myCustomMessagingListener.pollingTimeoutSeconds = 18
messaging.listener.myCustomMessagingListener.sleepSecondsInBetweenIterations = 0
messaging.listener.myCustomMessagingListener.maxMessagesToReceiveAtOnce = 20
# if there are 20 messages to receive at once, then do this 50 times per call max
messaging.listener.myCustomMessagingListener.maxOuterLoops = 50
messaging.listener.myCustomMessagingListener.incrementalLoaderJobName = incrementalLoader1
  • Update the messagingSystemName to point to your messaging system (Grouper supports a built in messaging system along with RabbitMQ, AWS, etc).
  • Update the queueName
  • Update incrementalLoaderJobName based on what was configured earlier in the Configuration section above.

Format of messages:

{'subjectId':'test.subject.0', 'loaderGroupName':'test:owner', 'sourceId':'jdbc'}
  • Must have subjectId, subjectIdentifier or subjectIdOrIdentifier.
  • Must have loaderGroupName
  • sourceId is optional

Fail safe

If there are more than a certain number of changes for a loader job, the incremental loader will avoid processing those changes and instead immediately trigger a full sync.  This helps with performance but also outsources fail safe to the full sync, which has its own configuration options.  The number by default is 100 but can be adjusted in grouper-loader.properties:

otherJob.incrementalLoader1.fullSyncThreshold=100

 

 

Examples

Example of SQL_SIMPLE using Oracle (loader table has a group name field to allow multiple SQL_SIMPLE jobs):
Say you have a loader table that looks like the following:
 
CREATE TABLE myloadertable
(
    subject_id VARCHAR(255),
    group_name VARCHAR(1024)
);
 
With the following incremental table:
 
CREATE TABLE myincrementaltable
(
    id NUMBER NOT NULL,
    subject_id VARCHAR(255),
    subject_identifier VARCHAR(255),
    subject_id_or_identifier VARCHAR(255),
    source_id VARCHAR(255),
    loader_group_name VARCHAR(1024) NOT NULL,
    timestamp NUMBER NOT NULL,
    completed_timestamp NUMBER,
    PRIMARY KEY (id)
);
 
And a sequence for the primary key on the incremental table:
 
CREATE SEQUENCE myincrementaltable_seq;
 
And the following loader job:
addRootStem("test", "test")
addGroup("test", "loader1", "loader1")
groupAddType("test:loader1", "grouperLoader")
setGroupAttr("test:loader1", "grouperLoaderDbName", "grouper")
setGroupAttr("test:loader1", "grouperLoaderType", "SQL_SIMPLE")
setGroupAttr("test:loader1", "grouperLoaderScheduleType", "START_TO_START_INTERVAL")
setGroupAttr("test:loader1", "grouperLoaderQuery", "select subject_id from myloadertable")
setGroupAttr("test:loader1", "grouperLoaderIntervalSeconds", "86400")
 
Assuming this is on the same database as Grouper, you could add the following configuration in grouper-loader.properties (run every 5 seconds):
otherJob.incrementalLoader1.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader1.quartzCron = 0/5 * * * * ?
otherJob.incrementalLoader1.databaseName=grouper
otherJob.incrementalLoader1.tableName=myincrementaltable
 
And the following trigger:
 
CREATE OR REPLACE TRIGGER mytrigger
AFTER INSERT OR DELETE OR UPDATE ON myloadertable
FOR EACH ROW
DECLARE
  timemillis NUMBER;
BEGIN
  select extract(day from(sys_extract_utc(systimestamp) - to_timestamp('1970-01-01', 'YYYY-MM-DD'))) * 86400000 
     + to_number(to_char(sys_extract_utc(systimestamp), 'SSSSSFF3')) into timemillis from dual;
  IF (:new.subject_id is not null) THEN
    INSERT INTO myincrementaltable (id, subject_id, loader_group_name, timestamp) values (myincrementaltable_seq.nextval, :new.subject_id, :new.group_name, timemillis);
  END IF;
  IF (:old.subject_id is not null) THEN
    INSERT INTO myincrementaltable (id, subject_id, loader_group_name, timestamp) values (myincrementaltable_seq.nextval, :old.subject_id, :old.group_name, timemillis);
   END IF;
END;

 

Example of SQL_GROUP_LIST using Oracle (assumes a single loader job):
Say you have a loader table that looks like the following:
 
CREATE TABLE myloadertable
(
    subject_id VARCHAR(255),
    group_name VARCHAR(1024)
);
 
With the following incremental table:
 
CREATE TABLE myincrementaltable
(
    id NUMBER NOT NULL,
    subject_id VARCHAR(255),
    subject_identifier VARCHAR(255),
    subject_id_or_identifier VARCHAR(255),
    source_id VARCHAR(255),
    loader_group_name VARCHAR(1024) NOT NULL,
    timestamp NUMBER NOT NULL,
    completed_timestamp NUMBER,
    PRIMARY KEY (id)
);
 
And a sequence for the primary key on the incremental table:
 
CREATE SEQUENCE myincrementaltable_seq;
 
And the following loader job:
addRootStem("test", "test")
addGroup("test", "owner", "owner")
groupAddType("test:owner", "grouperLoader")
setGroupAttr("test:owner", "grouperLoaderDbName", "grouper")
setGroupAttr("test:owner", "grouperLoaderType", "SQL_GROUP_LIST")
setGroupAttr("test:owner", "grouperLoaderScheduleType", "START_TO_START_INTERVAL")
setGroupAttr("test:owner", "grouperLoaderQuery", "select group_name, subject_id from myloadertable")
setGroupAttr("test:owner", "grouperLoaderIntervalSeconds", "86400")
setGroupAttr("test:owner", "grouperLoaderGroupsLike", "test:loader%")


Assuming this is on the same database as Grouper, you could add the following configuration in grouper-loader.properties (run every 5 seconds):
otherJob.incrementalLoader1.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader1.quartzCron = 0/5 * * * * ?
otherJob.incrementalLoader1.databaseName=grouper
otherJob.incrementalLoader1.tableName=myincrementaltable
 
And the following trigger:
 
CREATE OR REPLACE TRIGGER mytrigger
AFTER INSERT OR DELETE OR UPDATE ON myloadertable
FOR EACH ROW
DECLARE
  timemillis NUMBER;
BEGIN
  select extract(day from(sys_extract_utc(systimestamp) - to_timestamp('1970-01-01', 'YYYY-MM-DD'))) * 86400000 
     + to_number(to_char(sys_extract_utc(systimestamp), 'SSSSSFF3')) into timemillis from dual;
  IF (:new.subject_id is not null) THEN
    INSERT INTO myincrementaltable (id, subject_id, loader_group_name, timestamp) values (myincrementaltable_seq.nextval, :new.subject_id, 'test:owner', timemillis);
  END IF;
  IF (:old.subject_id is not null) THEN
    INSERT INTO myincrementaltable (id, subject_id, loader_group_name, timestamp) values (myincrementaltable_seq.nextval, :old.subject_id, 'test:owner', timemillis);
   END IF;
END;

 

Example of SQL_SIMPLE using MySQL (loader table has a group name field to allow multiple SQL_SIMPLE jobs):
Say you have a loader table that looks like the following:
 
CREATE TABLE myloadertable
(
    subject_id VARCHAR(255),
    group_name VARCHAR(1024)
);
 
With the following incremental table:
 
CREATE TABLE myincrementaltable
(
    id BIGINT NOT NULL AUTO_INCREMENT,
    subject_id VARCHAR(255),
    subject_identifier VARCHAR(255),
    subject_id_or_identifier VARCHAR(255),
    source_id VARCHAR(255),
    loader_group_name VARCHAR(1024) NOT NULL,
    timestamp BIGINT NOT NULL,
    completed_timestamp BIGINT,
    PRIMARY KEY (id)
);
  
And the following loader job:
addRootStem("test", "test")
addGroup("test", "loader1", "loader1")
groupAddType("test:loader1", "grouperLoader")
setGroupAttr("test:loader1", "grouperLoaderDbName", "grouper")
setGroupAttr("test:loader1", "grouperLoaderType", "SQL_SIMPLE")
setGroupAttr("test:loader1", "grouperLoaderScheduleType", "START_TO_START_INTERVAL")
setGroupAttr("test:loader1", "grouperLoaderQuery", "select subject_id from myloadertable")
setGroupAttr("test:loader1", "grouperLoaderIntervalSeconds", "86400")
 
Assuming this is on the same database as Grouper, you could add the following configuration in grouper-loader.properties (run every 5 seconds):
otherJob.incrementalLoader1.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader1.quartzCron = 0/5 * * * * ?
otherJob.incrementalLoader1.databaseName=grouper
otherJob.incrementalLoader1.tableName=myincrementaltable
 
And the following trigger:

delimiter |
CREATE TRIGGER mytrigger_insert
AFTER INSERT ON myloadertable
FOR EACH ROW
BEGIN
  INSERT INTO myincrementaltable (subject_id, loader_group_name, timestamp) values (NEW.subject_id, NEW.group_name, ROUND(UNIX_TIMESTAMP(CURTIME(4)) * 1000));
END;

|
CREATE TRIGGER mytrigger_update
AFTER UPDATE ON myloadertable
FOR EACH ROW
BEGIN
  INSERT INTO myincrementaltable (subject_id, loader_group_name, timestamp) values (NEW.subject_id, NEW.group_name, ROUND(UNIX_TIMESTAMP(CURTIME(4)) * 1000));
  INSERT INTO myincrementaltable (subject_id, loader_group_name, timestamp) values (OLD.subject_id, OLD.group_name, ROUND(UNIX_TIMESTAMP(CURTIME(4)) * 1000));
END;
|
CREATE TRIGGER mytrigger_delete
AFTER DELETE ON myloadertable
FOR EACH ROW
BEGIN
  INSERT INTO myincrementaltable (subject_id, loader_group_name, timestamp) values (OLD.subject_id, OLD.group_name, ROUND(UNIX_TIMESTAMP(CURTIME(4)) * 1000));
END;
|
delimiter ;
 
 
 
 
 
 
 
 
  • No labels