...
This page provides sample code showing how to integrate develop with the Grouper Messaging System with another messaging system.
GSH to manage built in messaging
GSH to send / receive messages
Integrating Grouper messaging with a new messaging system (e.g. if an AcmeMQ adapter is not implemented, this is how to implement one)
- Interface GrouperMessagingSystem defines a messaging system. Implement this to integrate Grouper messaging with a messaging system
- GrouperBuiltinMessagingSystem implements GrouperMessagingSystem - Grouper's implementation of the messaging interface
- GrouperMessagingEngine is the Grouper API to send a message to various messaging systems
...
Code Block |
---|
################################ ## Grouper Messaging System ################################ # name of messaging system which is the default grouper.messaging.default.name.of.messaging.system = grouperBuiltinMessaging # name of a messaging system. note, "grouperBuiltinMessaging" can be arbitrary grouper.messaging.system.grouperBuiltinMessaging.name = grouperBuiltinMessaging # class that implements edu.internet2.middleware.grouperClient.messaging.GrouperMessagingSystem grouper.messaging.system.grouperBuiltinMessaging.class = edu.internet2.middleware.grouper.messaging.GrouperBuiltinMessagingSystem # name of a messaging system. note, "myAwsMessagingSystem" can be arbitrary # grouper.messaging.system.myAwsMessagingSystem.name = aws # class that implements edu.internet2.middleware.grouperClient.messaging.GrouperMessagingSystem # grouper.messaging.system.myAwsMessagingSystem.class = |
Configure Builtin queues/topicsThe messaging interface is:
Code Block |
---|
// create objects
GrouperBuiltinMessagingSystem.createQueue("abc");
GrouperBuiltinMessagingSystem.createTopic("def");
// permissions on objects
GrouperBuiltinMessagingSystem.allowSendToQueue("abc", SubjectTestHelper.SUBJ0);
GrouperBuiltinMessagingSystem.allowSendToTopic("abc", SubjectTestHelper.SUBJ0);
GrouperBuiltinMessagingSystem.allowReceiveFromQueue("abc", SubjectTestHelper.SUBJ0);
//link up topic with queue(s)
GrouperBuiltinMessagingSystem.topicSendsToQueue("def", "abc"); |
Use messaging
Code Block |
---|
// send a message
GrouperMessageSendResult grouperMessageSendResult = GrouperMessagingEngine.send(new GrouperMessageSendParam().assignGrouperMessageSystemName("someName").assignQueueOrTopic("abc").addMessageBody("message body"));
// receive messages
GrouperMessageReceiveResult grouperMessageReceiveResult = GrouperMessagingEngine.receive(new GrouperMessageReceiveParam().assignGrouperMessageSystemName("someName").assignQueueOrTopic("abc"));
// mark as processed
GrouperMessageProcessedResult grouperMessageProcessedResult = GrouperMessagingEngine.markAsProcessed(new GrouperMessageProcessedParam().assignGrouperMessageSystemName("someName").assignQueueOrTopic("abc").addGrouperMessage(grouperMessage));
|
Messaging listener
Code Block |
---|
#####################################
## Messaging listener
#####################################
# note, change "messagingListener" in key to be the name of the listener. e.g. messaging.listener.myAzureListener.class
# extends edu.internet2.middleware.grouper.messaging.MessagingListenerBase
# this listener will just print out messages: edu.internet2.middleware.grouper.messaging.MessagingListenerPrint
#
#messaging.listener.messagingListener.class = edu.internet2.middleware.grouper.messaging.MessagingListener
#messaging.listener.messagingListener.quartzCron = 0 * * * * ?
#messaging.listener.messagingListener.messagingSystemName = grouperBuiltinMessaging
#messaging.listener.messagingListener.queue = abc
#messaging.listener.messagingListener.numberOfTriesPerIteration = 3
#messaging.listener.messagingListener.pollingTimeoutSeconds = 18
#messaging.listener.messagingListener.sleepSecondsInBetweenIterations = 0
#messaging.listener.messagingListener.maxMessagesToReceiveAtOnce = 20
# if there are 20 messages to receive at once, then do this 50 times per call max
#messaging.listener.messagingListener.maxOuterLoops = 50
|
Sample messaging listener implementation
Code Block |
---|
/** * @author mchyzer * $Id$ */ package edu.internet2.middleware.grouper.messaging; import java.util.Collection; import edu.internet2.middleware.grouper.changeLog.ChangeLogEntry; import edu.internet2.middleware.grouperClient.messaging.GrouperMessage; import edu.internet2.middleware.grouperClient.messaging.GrouperMessageProcessedParam; import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine; /** * */ public class MessagingListenerPrint extends MessagingListenerBase { /** * ** * * @author mchyzer * $Id$ */ package edu.internet2.middleware.grouperClient.messaging; /** * Represents the methods that a messaging system * needs to support */ public interface GrouperMessagingSystem { /** * send a message to a queue name. Note, the recipient could be a * queue or a topic (generally always one or the other) based on the * implementation of the messaging system. Messages must be delievered * in the order that collection iterator designates. If there is a problem * delivering the messages, the implementation should log, wait (back off) * and retry until it is successful. * @param grouperMessageSendParam has the queue or topic, and the message(s) and perhaps args * @return result */ public GrouperMessageSendResult send(GrouperMessageSendParam grouperMessageSendParam); /** * this will generally block until there are messages to process. These messages * are ordered in the order that they were sent. * @param grouperMessageReceiveParam grouper messaging receive param * @return a message or multiple messages. It will block until there are messages * available for this recipient to process */ public GrouperMessageReceiveResult MessagingListenerPrintreceive() { }GrouperMessageReceiveParam grouperMessageReceiveParam); /** * tell the @see edu.internet2.middleware.grouper.messaging.MessagingListenerBase#processMessages(java.lang.String, java.lang.String, java.util.Collection, edu.internet2.middleware.grouper.messaging.MessagingListenerMetadata)messaging system that these messages are processed */ generally @Override the message publicsystem String processMessages(String messageSystemName, String queue,will use the message id. Note, the objects * sent to this Collection<GrouperMessage>method grouperMessageList, must be the same that were MessagingListenerMetadatareceived messagingListenerMetadata)in {the * receiveMessages method. If Stringthere lastIdProcessedis =a null;problem * fordelivering (GrouperMessagethe grouperMessagemessages, :the grouperMessageList)implementation { should wait (back off) try* { and retry until it is successful. Alternatively the message should be * Stringreturned jsonto = grouperMessage.getMessageBody(); //try to convert to change log entryqueue, returned to end of queue, or sent to another queue * try {@param grouperMessageAcknowledgeParam * @return result */ Collection<ChangeLogEntry>public changeLogEntriesGrouperMessageAcknowledgeResult = ChangeLogEntry.fromJsonToCollection(json); for (ChangeLogEntry changeLogEntry : changeLogEntries) { System.out.println("Change log entry: " + changeLogEntry.getChangeLogType().getChangeLogCategory() + " -> " + changeLogEntry.getChangeLogType().getActionName() + ", " + changeLogEntry.getId()); } System.out.println("Change log entry: " + json); } catch (Exception e) { System.out.println("Not change log entry: " + grouperMessage.getId() + ", " + json); } //mark it as processed GrouperMessagingEngine.markAsProcessed(new GrouperMessageProcessedParam() .assignQueue(queue).assignGropuerMessageSystemName(messageSystemName)acknowledge(GrouperMessageAcknowledgeParam grouperMessageAcknowledgeParam); } |
Messaging listener
A messaging listener is a daemon job which will check a message system queue for messages and act on them (calling an interface)
Code Block |
---|
#####################################
## Messaging listener using the messaging API
#####################################
# note, change "messagingListener" in key to be the name of the listener. e.g. messaging.listener.myAzureListener.class
# extends edu.internet2.middleware.grouper.messaging.MessagingListenerBase
# this listener will just print out messages: edu.internet2.middleware.grouper.messaging.MessagingListenerPrint
#
#messaging.listener.messagingListener.class = edu.internet2.middleware.grouper.messaging.MessagingListener
#messaging.listener.messagingListener.quartzCron = 0 * * * * ?
#messaging.listener.messagingListener.messagingSystemName = grouperBuiltinMessaging
#messaging.listener.messagingListener.queueName = abc
#messaging.listener.messagingListener.numberOfTriesPerIteration = 3
#messaging.listener.messagingListener.pollingTimeoutSeconds = 18
#messaging.listener.messagingListener.sleepSecondsInBetweenIterations = 0
#messaging.listener.messagingListener.maxMessagesToReceiveAtOnce = 20
# if there are 20 messages to receive at once, then do this 50 times per call max
#messaging.listener.messagingListener.maxOuterLoops = 50
|
Sample messaging listener implementation
Code Block |
---|
/**
* @author mchyzer
* $Id$
*/
package edu.internet2.middleware.grouper.messaging;
import java.util.Collection;
import edu.internet2.middleware.grouper.changeLog.ChangeLogEntry;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessage;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeParam;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeType;
import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine;
/**
*
*/
public class MessagingListenerPrint extends MessagingListenerBase {
/**
*
*/
public MessagingListenerPrint() {
}
/**
* @see edu.internet2.middleware.grouper.messaging.MessagingListenerBase#processMessages(java.lang.String, java.lang.String, java.util.Collection, edu.internet2.middleware.grouper.messaging.MessagingListenerMetadata)
*/
@Override
public void processMessages(String messageSystemName, String queue,
Collection<GrouperMessage> grouperMessageList,
MessagingListenerMetadata messagingListenerMetadata) {
for (GrouperMessage grouperMessage : grouperMessageList) {
try {
String json = grouperMessage.getMessageBody();
//try to convert to change log entry
try {
Collection<ChangeLogEntry> changeLogEntries = ChangeLogEntry.fromJsonToCollection(json);
for (ChangeLogEntry changeLogEntry : changeLogEntries) {
System.out.println("Change log entry: " + changeLogEntry.getChangeLogType().getChangeLogCategory() +
" -> " + changeLogEntry.getChangeLogType().getActionName() + ", " + changeLogEntry.getId());
}
System.out.println("Change log entry: " + json);
} catch (Exception e) {
System.out.println("Not change log entry: " + grouperMessage.getId() + ", " + json);
}
//mark it as processed
GrouperMessagingEngine.acknowledge(new GrouperMessageAcknowledgeParam()
.assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed)
.assignQueueName(queue).assignGrouperMessageSystemName(messageSystemName)
.addGrouperMessage(grouperMessage));
} catch (Exception e) {
messagingListenerMetadata.registerProblem(e, "Problem in message: " + grouperMessage.getId(), grouperMessage.getId());
break;
}
}
}
}
|
Messaging listener that uses ChangeLogConsumerBase implementations
If you have a change log consumer and you want to have it process messages, use this:
Code Block |
---|
#####################################
## Messaging listener using the change log consumer API
#####################################
# note, change "messagingListenerChangeLogConsumer" in key to be the name of the listener. e.g. messaging.listener.myAzureListener.class
#
# keep this class to be MessagingListenerToChangeLogConsumer
#messaging.listener.messagingListenerChangeLogConsumer.class = edu.internet2.middleware.grouper.messaging.MessagingListenerToChangeLogConsumer
#messaging.listener.messagingListenerChangeLogConsumer.changeLogConsumerClass = edu.internet2.middleware.grouper.messaging.SomethingExtendsChangeLogConsumerBase
#messaging.listener.messagingListenerChangeLogConsumer.quartzCron = 0 * * * * ?
#messaging.listener.messagingListenerChangeLogConsumer.messagingSystemName = grouperBuiltinMessaging
#messaging.listener.messagingListenerChangeLogConsumer.queueName = abc
#messaging.listener.messagingListenerChangeLogConsumer.numberOfTriesPerIteration = 3
#messaging.listener.messagingListenerChangeLogConsumer.pollingTimeoutSeconds = 18
#messaging.listener.messagingListenerChangeLogConsumer.sleepSecondsInBetweenIterations = 0
#messaging.listener.messagingListenerChangeLogConsumer.maxMessagesToReceiveAtOnce = 20
# if there are 20 messages to receive at once, then do this 50 times per call max
#messaging.listener.messagingListenerChangeLogConsumer.maxOuterLoops = 50
|
Change log consumer that sends to messaging
This is a change log consumer that will send change log entries to a queue or topic with a JSON format that can be easily converted back to ChangeLogEntries
Code Block |
---|
#####################################
## Messaging integration with change log, send change log entries to a messaging system
#####################################
# note, change "messaging" in key to be the name of the consumer. e.g. changeLog.consumer.myAzureConsumer.class
#changeLog.consumer.messaging.class = edu.internet2.middleware.grouper.changeLog.ChangeLogConsumerToMessage
#changeLog.consumer.messaging.quartzCron = 0 * * * * ?
#changeLog.consumer.messaging.messagingSystemName = grouperBuiltinMessaging
# queue or topic
#changeLog.consumer.messaging.messageQueueType = queue
#changeLog.consumer.messaging.queueOrTopicName = abc
|
Change log JSON
Code Block |
---|
{ "event":[ { .addGrouperMessage(grouperMessage)); "changeLogTypeId":"7db1fb2d34944668bc7d4485f1c7854b", "contextId":"7e4ca9c1dfd14111a60c8b057e5ae5aa", } catch (Exception e) { "createdOnDb":1458577616745723, messagingListenerMetadata.registerProblem(e, "Problem in message: " + grouperMessage.getId(), grouperMessage.getId()); sequenceNumber":473, "changeLogTypeCategory":"privilege", break; } "changeLogTypeAction":"addPrivilege", lastIdProcessed = grouperMessage.getId(); "field_id":"435e7190d1814d86a26e6bbd70e9bca3", } return lastIdProcessed; } } |
Change log consumer that sends to messaging
Code Block |
---|
#####################################
## Messaging integration with change log
#####################################
# note, change "messaging" in key to be the name of the consumer. e.g. changeLog.consumer.myAzureConsumer.class
#changeLog.consumer.messaging.class = edu.internet2.middleware.grouper.changeLog.ChangeLogConsumerToMessage
#changeLog.consumer.messaging.quartzCron = 0 * * * * ?
#changeLog.consumer.messaging.messagingSystemName = grouperBuiltinMessaging
#changeLog.consumer.messaging.queueOrTopic = abc
|
Change log JSON
Code Block |
---|
{ "event":[ "field_privilegeName":"read", "field_subjectId":"GrouperAll", "field_sourceId":"g:isa", "field_privilegeType":"access", "field_ownerType":"group", "field_ownerId":"7d09cfff2b444db696a0771b224081ef", { "field_ownerName":"test:testGroup3", "changeLogTypeIdfield_memberId":"7db1fb2d34944668bc7d4485f1c7854be6e154ea21f64c0d9ecb9f4137ab1ac3", "contextIdfield_fieldId":"7e4ca9c1dfd14111a60c8b057e5ae5aa5da0071e39ff4fb2a17cec73ac25efb3", "createdOnDb":1458577616745723,field_membershipType":"flattened" } ] } |
Convert Change Log Entry to and from json
Code Block |
---|
String json = changeLogEntry.toJson(true); ChangeLogEntry newEntry "sequenceNumber":473, "changeLogTypeCategory":"privilege", "changeLogTypeAction":"addPrivilege", "field_id":"435e7190d1814d86a26e6bbd70e9bca3", "field_privilegeName":"read", "field_subjectId":"GrouperAll", "field_sourceId":"g:isa", "field_privilegeType":"access", "field_ownerType":"group", "field_ownerId":"7d09cfff2b444db696a0771b224081ef", "field_ownerName":"test:testGroup3", "field_memberId":"e6e154ea21f64c0d9ecb9f4137ab1ac3", "field_fieldId":"5da0071e39ff4fb2a17cec73ac25efb3", "field_membershipType":"flattened" } ] } |
Convert Change Log Entry to and from json
Code Block |
---|
String json = changeLogEntry.toJson(true);
ChangeLogEntry newEntry = ChangeLogEntry.fromJsonToCollection(json).iterator().next(); |
Messaging ESB change log consumer, configure in grouper-loader.properties
= ChangeLogEntry.fromJsonToCollection(json).iterator().next(); |
Messaging ESB change log consumer, configure in grouper-loader.properties
Code Block |
---|
#####################################
## Messaging integration with ESB, send change log entries to a messaging system
#####################################
# note, change "messagingEsb" in key to be the name of the consumer. e.g. changeLog.consumer.myAzureConsumer.class
#changeLog.consumer.messagingEsb.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbConsumer
#changeLog.consumer.messagingEsb.quartzCron = 0 * * * * ?
#changeLog.consumer.messagingEsb.elfilter = event.eventType eq 'GROUP_DELETE' || event.eventType eq 'GROUP_ADD' || event.eventType eq 'MEMBERSHIP_DELETE' || event.eventType eq 'MEMBERSHIP_ADD'
#changeLog.consumer.messagingEsb.publisher.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbMessagingPublisher
#changeLog.consumer.messagingEsb.publisher.messagingSystemName = grouperBuiltinMessaging
# queue or topic
#changeLog.consumer.messagingEsb.messageQueueType = queue
#changeLog.consumer.messagingEsb.publisher.queueOrTopicName = abc
|
Configure builtin messaging cleanup jobs in grouper-loader.properties
Code Block |
---|
##################################
## grouper builtin messaging cleanup cron
##################################
#quartz cron-like schedule for grouper messaging daemon.
#leave blank to disable this, the default is every hour, 10 minutes after the hour
#this daemon does cleanup on the builtin messaging table
changeLog.builtinMessagingDaemon.quartz.cron = 0 10 * * * ?
# after three days of not consuming messages, delete them, if -1, dont run this daemon
grouper.builtin.messaging.deleteAllMessagesMoreThanHoursOld = 72
# after three hours of having processed messages, delete them. Note, if this is -1 just delete when marking processed
grouper.builtin.messaging.deleteProcessedMessagesMoreThanMinutesOld = 180
|
Test messaging
Run the test: edu.internet2.middleware.grouper.messaging.GrouperBuiltinMessagingSystemTest
Set this in log4j.properties to see debug and performance info:
Code Block |
---|
log4j.logger. |
Code Block |
##################################### ## Messaging integration ##################################### # note, change "messagingEsb" in key to be the name of the consumer. e.g. changeLog.consumer.myAzureConsumer.class #changeLog.consumer.messagingEsb.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbConsumer #changeLog.consumer.messagingEsb.quartzCron = 0 * * * * ? #changeLog.consumer.messagingEsb.elfilter = event.eventType eq 'GROUP_DELETE' || event.eventType eq 'GROUP_ADD' || event.eventType eq 'MEMBERSHIP_DELETE' || event.eventType eq 'MEMBERSHIP_ADD' #changeLog.consumer.messagingEsb.publisher.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbMessagingPublisher #changeLog.consumer.messagingEsb.publisher.messagingSystemName = grouperBuiltinMessaging #changeLog.consumer.messagingEsb.publisher.queueOrTopic = abc |
Configure builtin messaging cleanup jobs in grouper-loader.properties
messaging.GrouperBuiltinMessagingSystemTest = DEBUG |
Example to get messages
Code Block |
---|
String messageSystemConfigName = "someConfigName";
String queueName = "someQueue";
GrouperMessageReceiveResult grouperMessageReceiveResult = GrouperMessagingEngine.receive(new GrouperMessageReceiveParam()
.assignGrouperMessageSystemName(messageSystemConfigName)
.assignGrouperMessageQueueParam(new GrouperMessageQueueParam().assignQueueOrTopicName(queueName).assignQueueType(GrouperMessageQueueType.queue))
.assignMaxMessagesToReceiveAtOnce(20));
for (GrouperMessage grouperMessage : grouperMessageReceiveResult.getGrouperMessages()) {
String body = grouperMessage.getMessageBody();
//do something with message
GrouperMessagingEngine.acknowledge(new GrouperMessageAcknowledgeParam()
.assignGrouperMessageSystemName(messageSystemConfigName)
.assignGrouperMessageQueueParam(new GrouperMessageQueueParam().assignQueueOrTopicName(queueName).assignQueueType(GrouperMessageQueueType.queue))
.assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed)
.assignGrouperMessages(GrouperClientUtils.toSet(grouperMessage)));
} |
Code Block |
##################################
## grouper builtin messaging cleanup cron
##################################
#quartz cron-like schedule for grouper messaging daemon.
#leave blank to disable this, the default is every hour, 10 minutes after the hour
#this daemon does cleanup on the builtin messaging table
changeLog.builtinMessagingDaemon.quartz.cron = 0 10 * * * ?
# after three days of not consuming messages, delete them, if -1, dont run this daemon
grouper.builtin.messaging.deleteAllMessagesMoreThanHoursOld = 72
# after three hours of having processed messages, delete them. Note, if this is -1 just delete when marking processed
grouper.builtin.messaging.deleteProcessedMessagesMoreThanMinutesOld = 180
|
See Also
...