Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page provides sample code showing how to integrate develop with the Grouper Messaging System with another messaging system.

GSH to manage built in messaging

GSH to send / receive messages

Integrating Grouper messaging with a new messaging system (e.g. if an AcmeMQ adapter is not implemented, this is how to implement one)

  • Interface GrouperMessagingSystem defines a messaging system.  Implement this to integrate Grouper messaging with a messaging system
  • GrouperBuiltinMessagingSystem implements GrouperMessagingSystem - Grouper's implementation of the messaging interface
  • GrouperMessagingEngine is the Grouper API to send a message to various messaging systems

...

Code Block
################################
## Grouper Messaging System
################################

# name of messaging system which is the default
grouper.messaging.default.name.of.messaging.system = grouperBuiltinMessaging


# name of a messaging system.  note, "grouperBuiltinMessaging" can be arbitrary
grouper.messaging.system.grouperBuiltinMessaging.name = grouperBuiltinMessaging
# class that implements edu.internet2.middleware.grouperClient.messaging.GrouperMessagingSystem
grouper.messaging.system.grouperBuiltinMessaging.class = edu.internet2.middleware.grouper.messaging.GrouperBuiltinMessagingSystem


# name of a messaging system.  note, "myAwsMessagingSystem" can be arbitrary
# grouper.messaging.system.myAwsMessagingSystem.name = aws
# class that implements edu.internet2.middleware.grouperClient.messaging.GrouperMessagingSystem
# grouper.messaging.system.myAwsMessagingSystem.class = 

Configure Builtin queues/topicsThe messaging interface is:

Code Block
// create objects
GrouperBuiltinMessagingSystem.createQueue("abc");
GrouperBuiltinMessagingSystem.createTopic("def");

// permissions on objects
GrouperBuiltinMessagingSystem.allowSendToQueue("abc", SubjectTestHelper.SUBJ0);
GrouperBuiltinMessagingSystem.allowSendToTopic("abc", SubjectTestHelper.SUBJ0);
GrouperBuiltinMessagingSystem.allowReceiveFromQueue("abc", SubjectTestHelper.SUBJ0);


//link up topic with queue(s)
GrouperBuiltinMessagingSystem.topicSendsToQueue("def", "abc");

Use messaging

Code Block
// send a message
GrouperMessageSendResult grouperMessageSendResult = GrouperMessagingEngine.send(new GrouperMessageSendParam().assignGrouperMessageSystemName("someName").assignQueueOrTopic("abc").addMessageBody("message body"));


// receive messages
GrouperMessageReceiveResult grouperMessageReceiveResult = GrouperMessagingEngine.receive(new GrouperMessageReceiveParam().assignGrouperMessageSystemName("someName").assignQueueOrTopic("abc"));


// mark as processed
GrouperMessageProcessedResult grouperMessageProcessedResult = GrouperMessagingEngine.markAsProcessed(new GrouperMessageProcessedParam().assignGrouperMessageSystemName("someName").assignQueueOrTopic("abc").addGrouperMessage(grouperMessage));

Messaging listener

Code Block
#####################################
## Messaging listener
#####################################
# note, change "messagingListener" in key to be the name of the listener.  e.g. messaging.listener.myAzureListener.class
# extends edu.internet2.middleware.grouper.messaging.MessagingListenerBase
# this listener will just print out messages: edu.internet2.middleware.grouper.messaging.MessagingListenerPrint
#


#messaging.listener.messagingListener.class = edu.internet2.middleware.grouper.messaging.MessagingListener
#messaging.listener.messagingListener.quartzCron = 0 * * * * ?
#messaging.listener.messagingListener.messagingSystemName = grouperBuiltinMessaging
#messaging.listener.messagingListener.queue = abc
#messaging.listener.messagingListener.numberOfTriesPerIteration = 3
#messaging.listener.messagingListener.pollingTimeoutSeconds = 18
#messaging.listener.messagingListener.sleepSecondsInBetweenIterations = 0
#messaging.listener.messagingListener.maxMessagesToReceiveAtOnce = 20
# if there are 20 messages to receive at once, then do this 50 times per call max
#messaging.listener.messagingListener.maxOuterLoops = 50

Sample messaging listener implementation

Code Block
/**
 * @author mchyzer
 * $Id$
 */
package edu.internet2.middleware.grouper.messaging;
import java.util.Collection;
import edu.internet2.middleware.grouper.changeLog.ChangeLogEntry;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessage;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessageProcessedParam;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine;

/**
 *
 */
public class MessagingListenerPrint extends MessagingListenerBase {
  /**
   * **
 * 
 * @author mchyzer
 * $Id$
 */
package edu.internet2.middleware.grouperClient.messaging;

/**
 * Represents the methods that a messaging system
 * needs to support
 */
public interface GrouperMessagingSystem {
  /**
   * send a message to a queue name.  Note, the recipient could be a 
   * queue or a topic (generally always one or the other) based on the 
   * implementation of the messaging system.  Messages must be delievered
   * in the order that collection iterator designates.  If there is a problem
   * delivering the messages, the implementation should log, wait (back off)
   * and retry until it is successful.
   * @param grouperMessageSendParam has the queue or topic, and the message(s) and perhaps args
   * @return result
   */
  public GrouperMessageSendResult send(GrouperMessageSendParam grouperMessageSendParam);


  /**
   * this will generally block until there are messages to process.  These messages
   * are ordered in the order that they were sent.
   * @param grouperMessageReceiveParam grouper messaging receive param
   * @return a message or multiple messages.  It will block until there are messages
   * available for this recipient to process
   */
  public GrouperMessageReceiveResult MessagingListenerPrintreceive() {
  }GrouperMessageReceiveParam grouperMessageReceiveParam);


  /**
   * tell the @see edu.internet2.middleware.grouper.messaging.MessagingListenerBase#processMessages(java.lang.String, java.lang.String, java.util.Collection, edu.internet2.middleware.grouper.messaging.MessagingListenerMetadata)messaging system that these messages are processed
   */
 generally @Override
the message publicsystem String processMessages(String messageSystemName, String queue,will use the message id.  Note, the objects
   * sent to this Collection<GrouperMessage>method grouperMessageList,
must be the same that were MessagingListenerMetadatareceived messagingListenerMetadata)in {the
   * 
receiveMessages method.  If Stringthere lastIdProcessedis =a null;problem
   * fordelivering (GrouperMessagethe grouperMessagemessages, :the grouperMessageList)implementation {
should wait (back off)
   try* {
and retry until it is successful.  Alternatively 
the message should be 
   * Stringreturned jsonto = grouperMessage.getMessageBody();
        
        //try to convert to change log entryqueue, returned to end of queue, or sent to another queue
   *     try {@param grouperMessageAcknowledgeParam
   * @return result
   */
  Collection<ChangeLogEntry>public changeLogEntriesGrouperMessageAcknowledgeResult = ChangeLogEntry.fromJsonToCollection(json);
          for (ChangeLogEntry changeLogEntry : changeLogEntries) {
            System.out.println("Change log entry: " + changeLogEntry.getChangeLogType().getChangeLogCategory() +
                " -> " + changeLogEntry.getChangeLogType().getActionName() + ", " + changeLogEntry.getId());
          }
          System.out.println("Change log entry: " + json);
        } catch (Exception e) {
          System.out.println("Not change log entry: " + grouperMessage.getId() + ", " + json);
        }
        
        //mark it as processed
        GrouperMessagingEngine.markAsProcessed(new GrouperMessageProcessedParam()
          .assignQueue(queue).assignGropuerMessageSystemName(messageSystemName)acknowledge(GrouperMessageAcknowledgeParam grouperMessageAcknowledgeParam);


}

 

Messaging listener

A messaging listener is a daemon job which will check a message system queue for messages and act on them (calling an interface)

Code Block
#####################################
## Messaging listener using the messaging API
#####################################


# note, change "messagingListener" in key to be the name of the listener.  e.g. messaging.listener.myAzureListener.class
# extends edu.internet2.middleware.grouper.messaging.MessagingListenerBase
# this listener will just print out messages: edu.internet2.middleware.grouper.messaging.MessagingListenerPrint
#


#messaging.listener.messagingListener.class = edu.internet2.middleware.grouper.messaging.MessagingListener
#messaging.listener.messagingListener.quartzCron = 0 * * * * ?
#messaging.listener.messagingListener.messagingSystemName = grouperBuiltinMessaging
#messaging.listener.messagingListener.queueName = abc
#messaging.listener.messagingListener.numberOfTriesPerIteration = 3
#messaging.listener.messagingListener.pollingTimeoutSeconds = 18
#messaging.listener.messagingListener.sleepSecondsInBetweenIterations = 0
#messaging.listener.messagingListener.maxMessagesToReceiveAtOnce = 20
# if there are 20 messages to receive at once, then do this 50 times per call max
#messaging.listener.messagingListener.maxOuterLoops = 50



Sample messaging listener implementation

Code Block
/**
 * @author mchyzer
 * $Id$
 */
package edu.internet2.middleware.grouper.messaging;
import java.util.Collection;
import edu.internet2.middleware.grouper.changeLog.ChangeLogEntry;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessage;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeParam;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeType;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine;

/**
 *
 */
public class MessagingListenerPrint extends MessagingListenerBase {
  /**
   * 
   */
  public MessagingListenerPrint() {
  }
  /**
   * @see edu.internet2.middleware.grouper.messaging.MessagingListenerBase#processMessages(java.lang.String, java.lang.String, java.util.Collection, edu.internet2.middleware.grouper.messaging.MessagingListenerMetadata)
   */
  @Override
  public void processMessages(String messageSystemName, String queue,
      Collection<GrouperMessage> grouperMessageList,
      MessagingListenerMetadata messagingListenerMetadata) {
    
    for (GrouperMessage grouperMessage : grouperMessageList) {
      try {
        
        String json = grouperMessage.getMessageBody();
        
        //try to convert to change log entry
        try {
          Collection<ChangeLogEntry> changeLogEntries = ChangeLogEntry.fromJsonToCollection(json);
          for (ChangeLogEntry changeLogEntry : changeLogEntries) {
            System.out.println("Change log entry: " + changeLogEntry.getChangeLogType().getChangeLogCategory() +
                " -> " + changeLogEntry.getChangeLogType().getActionName() + ", " + changeLogEntry.getId());
          }
          System.out.println("Change log entry: " + json);
        } catch (Exception e) {
          System.out.println("Not change log entry: " + grouperMessage.getId() + ", " + json);
        }
        
        //mark it as processed
        GrouperMessagingEngine.acknowledge(new GrouperMessageAcknowledgeParam()
          .assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed)
          .assignQueueName(queue).assignGrouperMessageSystemName(messageSystemName)
          .addGrouperMessage(grouperMessage));
        
      } catch (Exception e) {
        messagingListenerMetadata.registerProblem(e, "Problem in message: " + grouperMessage.getId(), grouperMessage.getId());
        break;
      }
    }
  }
}



 

Messaging listener that uses ChangeLogConsumerBase implementations

If you have a change log consumer and you want to have it process messages, use this:

Code Block
#####################################
## Messaging listener using the change log consumer API
#####################################


# note, change "messagingListenerChangeLogConsumer" in key to be the name of the listener.  e.g. messaging.listener.myAzureListener.class
#
# keep this class to be MessagingListenerToChangeLogConsumer
#messaging.listener.messagingListenerChangeLogConsumer.class = edu.internet2.middleware.grouper.messaging.MessagingListenerToChangeLogConsumer
#messaging.listener.messagingListenerChangeLogConsumer.changeLogConsumerClass = edu.internet2.middleware.grouper.messaging.SomethingExtendsChangeLogConsumerBase
#messaging.listener.messagingListenerChangeLogConsumer.quartzCron = 0 * * * * ?
#messaging.listener.messagingListenerChangeLogConsumer.messagingSystemName = grouperBuiltinMessaging
#messaging.listener.messagingListenerChangeLogConsumer.queueName = abc
#messaging.listener.messagingListenerChangeLogConsumer.numberOfTriesPerIteration = 3
#messaging.listener.messagingListenerChangeLogConsumer.pollingTimeoutSeconds = 18
#messaging.listener.messagingListenerChangeLogConsumer.sleepSecondsInBetweenIterations = 0
#messaging.listener.messagingListenerChangeLogConsumer.maxMessagesToReceiveAtOnce = 20
# if there are 20 messages to receive at once, then do this 50 times per call max
#messaging.listener.messagingListenerChangeLogConsumer.maxOuterLoops = 50

 

Change log consumer that sends to messaging

This is a change log consumer that will send change log entries to a queue or topic with a JSON format that can be easily converted back to ChangeLogEntries

Code Block
#####################################
## Messaging integration with change log, send change log entries to a messaging system
#####################################
# note, change "messaging" in key to be the name of the consumer.  e.g. changeLog.consumer.myAzureConsumer.class
#changeLog.consumer.messaging.class = edu.internet2.middleware.grouper.changeLog.ChangeLogConsumerToMessage
#changeLog.consumer.messaging.quartzCron = 0 * * * * ?
#changeLog.consumer.messaging.messagingSystemName = grouperBuiltinMessaging
# queue or topic
#changeLog.consumer.messaging.messageQueueType = queue
#changeLog.consumer.messaging.queueOrTopicName = abc

Change log JSON

Code Block
{  
   "event":[  
      {  
          .addGrouperMessage(grouperMessage));
"changeLogTypeId":"7db1fb2d34944668bc7d4485f1c7854b",
         "contextId":"7e4ca9c1dfd14111a60c8b057e5ae5aa",
      } catch (Exception e) { "createdOnDb":1458577616745723,
        messagingListenerMetadata.registerProblem(e, "Problem in message: " + grouperMessage.getId(), grouperMessage.getId());
sequenceNumber":473,
         "changeLogTypeCategory":"privilege",
    break;
      }
"changeLogTypeAction":"addPrivilege",
       lastIdProcessed = grouperMessage.getId(); "field_id":"435e7190d1814d86a26e6bbd70e9bca3",
    }
    return lastIdProcessed;
  }
}

 

Change log consumer that sends to messaging

Code Block
#####################################
## Messaging integration with change log
#####################################

# note, change "messaging" in key to be the name of the consumer.  e.g. changeLog.consumer.myAzureConsumer.class
#changeLog.consumer.messaging.class = edu.internet2.middleware.grouper.changeLog.ChangeLogConsumerToMessage
#changeLog.consumer.messaging.quartzCron = 0 * * * * ?
#changeLog.consumer.messaging.messagingSystemName = grouperBuiltinMessaging
#changeLog.consumer.messaging.queueOrTopic = abc

Change log JSON

Code Block
{  
   "event":[  
 "field_privilegeName":"read",
         "field_subjectId":"GrouperAll",
         "field_sourceId":"g:isa",
         "field_privilegeType":"access",
         "field_ownerType":"group",
         "field_ownerId":"7d09cfff2b444db696a0771b224081ef",
        {   "field_ownerName":"test:testGroup3",
         "changeLogTypeIdfield_memberId":"7db1fb2d34944668bc7d4485f1c7854be6e154ea21f64c0d9ecb9f4137ab1ac3",
         "contextIdfield_fieldId":"7e4ca9c1dfd14111a60c8b057e5ae5aa5da0071e39ff4fb2a17cec73ac25efb3",
         "createdOnDb":1458577616745723,field_membershipType":"flattened"
      }
   ]
}

Convert Change Log Entry to and from json

Code Block
String json = changeLogEntry.toJson(true);
ChangeLogEntry newEntry  "sequenceNumber":473,
         "changeLogTypeCategory":"privilege",
         "changeLogTypeAction":"addPrivilege",
         "field_id":"435e7190d1814d86a26e6bbd70e9bca3",
         "field_privilegeName":"read",
         "field_subjectId":"GrouperAll",
         "field_sourceId":"g:isa",
         "field_privilegeType":"access",
         "field_ownerType":"group",
         "field_ownerId":"7d09cfff2b444db696a0771b224081ef",
         "field_ownerName":"test:testGroup3",
         "field_memberId":"e6e154ea21f64c0d9ecb9f4137ab1ac3",
         "field_fieldId":"5da0071e39ff4fb2a17cec73ac25efb3",
         "field_membershipType":"flattened"
      }
   ]
}

Convert Change Log Entry to and from json

Code Block
String json = changeLogEntry.toJson(true);
ChangeLogEntry newEntry = ChangeLogEntry.fromJsonToCollection(json).iterator().next();

 

Messaging ESB change log consumer, configure in grouper-loader.properties

= ChangeLogEntry.fromJsonToCollection(json).iterator().next();

 

Messaging ESB change log consumer, configure in grouper-loader.properties

Code Block
#####################################
## Messaging integration with ESB, send change log entries to a messaging system
#####################################
# note, change "messagingEsb" in key to be the name of the consumer.  e.g. changeLog.consumer.myAzureConsumer.class
#changeLog.consumer.messagingEsb.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbConsumer
#changeLog.consumer.messagingEsb.quartzCron = 0 * * * * ?
#changeLog.consumer.messagingEsb.elfilter = event.eventType eq 'GROUP_DELETE' || event.eventType eq 'GROUP_ADD' || event.eventType eq 'MEMBERSHIP_DELETE' || event.eventType eq 'MEMBERSHIP_ADD'
#changeLog.consumer.messagingEsb.publisher.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbMessagingPublisher
#changeLog.consumer.messagingEsb.publisher.messagingSystemName = grouperBuiltinMessaging
# queue or topic
#changeLog.consumer.messagingEsb.messageQueueType = queue
#changeLog.consumer.messagingEsb.publisher.queueOrTopicName = abc

 

Configure builtin messaging cleanup jobs in grouper-loader.properties

Code Block
##################################
## grouper builtin messaging cleanup cron
##################################


#quartz cron-like schedule for grouper messaging daemon.
#leave blank to disable this, the default is every hour, 10 minutes after the hour 
#this daemon does cleanup on the builtin messaging table
changeLog.builtinMessagingDaemon.quartz.cron = 0 10 * * * ?


# after three days of not consuming messages, delete them, if -1, dont run this daemon
grouper.builtin.messaging.deleteAllMessagesMoreThanHoursOld = 72


# after three hours of having processed messages, delete them.  Note, if this is -1 just delete when marking processed
grouper.builtin.messaging.deleteProcessedMessagesMoreThanMinutesOld = 180


Test messaging

Run the test: edu.internet2.middleware.grouper.messaging.GrouperBuiltinMessagingSystemTest

Set this in log4j.properties to see debug and performance info:

Code Block
log4j.logger.
Code Block
#####################################
## Messaging integration
#####################################
# note, change "messagingEsb" in key to be the name of the consumer.  e.g. changeLog.consumer.myAzureConsumer.class
#changeLog.consumer.messagingEsb.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbConsumer
#changeLog.consumer.messagingEsb.quartzCron = 0 * * * * ?
#changeLog.consumer.messagingEsb.elfilter = event.eventType eq 'GROUP_DELETE' || event.eventType eq 'GROUP_ADD' || event.eventType eq 'MEMBERSHIP_DELETE' || event.eventType eq 'MEMBERSHIP_ADD'
#changeLog.consumer.messagingEsb.publisher.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbMessagingPublisher
#changeLog.consumer.messagingEsb.publisher.messagingSystemName = grouperBuiltinMessaging
#changeLog.consumer.messagingEsb.publisher.queueOrTopic = abc

 

Configure builtin messaging cleanup jobs in grouper-loader.properties

messaging.GrouperBuiltinMessagingSystemTest = DEBUG

Example to get messages

Code Block
    String messageSystemConfigName = "someConfigName";
    String queueName = "someQueue";
    GrouperMessageReceiveResult grouperMessageReceiveResult = GrouperMessagingEngine.receive(new GrouperMessageReceiveParam()
    .assignGrouperMessageSystemName(messageSystemConfigName)
    .assignGrouperMessageQueueParam(new GrouperMessageQueueParam().assignQueueOrTopicName(queueName).assignQueueType(GrouperMessageQueueType.queue))
    .assignMaxMessagesToReceiveAtOnce(20));

    for (GrouperMessage grouperMessage : grouperMessageReceiveResult.getGrouperMessages()) {
      String body = grouperMessage.getMessageBody();
      //do something with message
      GrouperMessagingEngine.acknowledge(new GrouperMessageAcknowledgeParam()
      .assignGrouperMessageSystemName(messageSystemConfigName)
      .assignGrouperMessageQueueParam(new GrouperMessageQueueParam().assignQueueOrTopicName(queueName).assignQueueType(GrouperMessageQueueType.queue))
      .assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed)
      .assignGrouperMessages(GrouperClientUtils.toSet(grouperMessage)));
    }
Code Block
##################################
## grouper builtin messaging cleanup cron
##################################

#quartz cron-like schedule for grouper messaging daemon.
#leave blank to disable this, the default is every hour, 10 minutes after the hour 
#this daemon does cleanup on the builtin messaging table
changeLog.builtinMessagingDaemon.quartz.cron = 0 10 * * * ?


# after three days of not consuming messages, delete them, if -1, dont run this daemon
grouper.builtin.messaging.deleteAllMessagesMoreThanHoursOld = 72


# after three hours of having processed messages, delete them.  Note, if this is -1 just delete when marking processed
grouper.builtin.messaging.deleteProcessedMessagesMoreThanMinutesOld = 180

 

See Also

Grouper Messaging System

Grouper Built In Messaging

...