...
Code Block |
---|
/** * * @author mchyzer * $Id$ */ package edu.internet2.middleware.grouperClient.messaging; /** * Represents the methods that a messaging system * needs to support */ public interface GrouperMessagingSystem { /** * send a message to a queue name. Note, the recipient could be a * queue or a topic (generally always one or the other) based on the * implementation of the messaging system. Messages must be delievered * in the order that collection iterator designates. If there is a problem * delivering the messages, the implementation should log, wait (back off) * and retry until it is successful. * @param grouperMessageSendParam has the queue or topic, and the message(s) and perhaps args * @return result */ public GrouperMessageSendResult send(GrouperMessageSendParam grouperMessageSendParam); /** * this will generally block until there are messages to process. These messages * are ordered in the order that they were sent. * @param grouperMessageReceiveParam grouper messaging receive param * @return a message or multiple messages. It will block until there are messages * available for this recipient to process */ public GrouperMessageReceiveResult receive(GrouperMessageReceiveParam grouperMessageReceiveParam); /** * tell the messaging system that these messages are processed * generally the message system will use the message id. Note, the objects * sent to this method must be the same that were received in the * receiveMessages method. If there is a problem * delivering the messages, the implementation should wait (back off) * and retry until it is successful. Alternatively the message should be * returned to queue, returned to end of queue, or sent to another queue * @param grouperMessageAcknowledgeParam * @return result */ public GrouperMessageAcknowledgeResult acknowledge(GrouperMessageAcknowledgeParam grouperMessageAcknowledgeParam); } |
Messaging listener
A messaging listener is a daemon job which will check a message system queue for messages and act on them (calling an interface)
Code Block |
---|
##################################### ## Messaging listener using the messaging API ##################################### # note, change "messagingListener" in key to be the name of the listener. e.g. messaging.listener.myAzureListener.class # extends edu.internet2.middleware.grouper.messaging.MessagingListenerBase # this listener will just print out messages: edu.internet2.middleware.grouper.messaging.MessagingListenerPrint # #messaging.listener.messagingListener.class = edu.internet2.middleware.grouper.messaging.MessagingListener #messaging.listener.messagingListener.quartzCron = 0 * * * * ? #messaging.listener.messagingListener.messagingSystemName = grouperBuiltinMessaging #messaging.listener.messagingListener.queuequeueName = abc #messaging.listener.messagingListener.numberOfTriesPerIteration = 3 #messaging.listener.messagingListener.pollingTimeoutSeconds = 18 #messaging.listener.messagingListener.sleepSecondsInBetweenIterations = 0 #messaging.listener.messagingListener.maxMessagesToReceiveAtOnce = 20 # if there are 20 messages to receive at once, then do this 50 times per call max #messaging.listener.messagingListener.maxOuterLoops = 50 |
Sample messaging listener implementation
Code Block |
---|
/** * @author mchyzer * $Id$ */ package edu.internet2.middleware.grouper.messaging; import java.util.Collection; import edu.internet2.middleware.grouper.changeLog.ChangeLogEntry; import edu.internet2.middleware.grouperClient.messaging.GrouperMessage; import edu.internet2.middleware.grouperClient.messaging.GrouperMessageProcessedParamGrouperMessageAcknowledgeParam; import edu.internet2.middleware.grouperClient.messaging.GrouperMessageAcknowledgeType; import edu.internet2.middleware.grouperClient.messaging.GrouperMessagingEngine; /** * */ public class MessagingListenerPrint extends MessagingListenerBase { /** * */ public MessagingListenerPrint() { } /** * @see edu.internet2.middleware.grouper.messaging.MessagingListenerBase#processMessages(java.lang.String, java.lang.String, java.util.Collection, edu.internet2.middleware.grouper.messaging.MessagingListenerMetadata) */ @Override public Stringvoid processMessages(String messageSystemName, String queue, Collection<GrouperMessage> grouperMessageList, MessagingListenerMetadata messagingListenerMetadata) { String lastIdProcessed = null; for (GrouperMessage grouperMessage : grouperMessageList) { try { String json = grouperMessage.getMessageBody(); //try to convert to change log entry try { Collection<ChangeLogEntry> changeLogEntries = ChangeLogEntry.fromJsonToCollection(json); for (ChangeLogEntry changeLogEntry : changeLogEntries) { System.out.println("Change log entry: " + changeLogEntry.getChangeLogType().getChangeLogCategory() + " -> " + changeLogEntry.getChangeLogType().getActionName() + ", " + changeLogEntry.getId()); } System.out.println("Change log entry: " + json); } catch (Exception e) { System.out.println("Not change log entry: " + grouperMessage.getId() + ", " + json); } //mark it as processed GrouperMessagingEngine.markAsProcessedacknowledge(new GrouperMessageProcessedParam( GrouperMessageAcknowledgeParam() .assignAcknowledgeType(GrouperMessageAcknowledgeType.mark_as_processed) .assignQueueassignQueueName(queue).assignGropuerMessageSystemNameassignGrouperMessageSystemName(messageSystemName) .addGrouperMessage(grouperMessage)); } catch (Exception e) { messagingListenerMetadata.registerProblem(e, "Problem in message: " + grouperMessage.getId(), grouperMessage.getId()); break; } } lastIdProcessed = grouperMessage.getId(); } return lastIdProcessed; } }} } |
Change log consumer that sends to messaging
This is a change log consumer that will send change log entries to a queue or topic with a JSON format that can be easily converted back to ChangeLogEntries
Code Block |
---|
##################################### ## Messaging integration with change log, send change log entries to a messaging system ##################################### # note, change "messaging" in key to be the name of the consumer. e.g. changeLog.consumer.myAzureConsumer.class #changeLog.consumer.messaging.class = edu.internet2.middleware.grouper.changeLog.ChangeLogConsumerToMessage #changeLog.consumer.messaging.quartzCron = 0 * * * * * * ?* ? #changeLog.consumer.messaging.messagingSystemName = grouperBuiltinMessaging # queue or topic #changeLog.consumer.messaging.messagingSystemNamemessageQueueType = grouperBuiltinMessagingqueue #changeLog.consumer.messaging.queueOrTopicqueueOrTopicName = abc |
Change log JSON
Code Block |
---|
{ "event":[ { "changeLogTypeId":"7db1fb2d34944668bc7d4485f1c7854b", "contextId":"7e4ca9c1dfd14111a60c8b057e5ae5aa", "createdOnDb":1458577616745723, "sequenceNumber":473, "changeLogTypeCategory":"privilege", "changeLogTypeAction":"addPrivilege", "field_id":"435e7190d1814d86a26e6bbd70e9bca3", "field_privilegeName":"read", "field_subjectId":"GrouperAll", "field_sourceId":"g:isa", "field_privilegeType":"access", "field_ownerType":"group", "field_ownerId":"7d09cfff2b444db696a0771b224081ef", "field_ownerName":"test:testGroup3", "field_memberId":"e6e154ea21f64c0d9ecb9f4137ab1ac3", "field_fieldId":"5da0071e39ff4fb2a17cec73ac25efb3", "field_membershipType":"flattened" } ] } |
...
Code Block |
---|
String json = changeLogEntry.toJson(true); ChangeLogEntry newEntry = ChangeLogEntry.fromJsonToCollection(json).iterator().next(); |
Messaging ESB change log consumer, configure in grouper-loader.properties
Code Block |
---|
##################################### ## Messaging integration with ESB, send change log entries to a messaging system ##################################### # note, change "messagingEsb" in key to be the name of the consumer. e.g. changeLog.consumer.myAzureConsumer.class #changeLog.consumer.messagingEsb.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbConsumer #changeLog.consumer.messagingEsb.quartzCron = 0 * * * * ? #changeLog.consumer.messagingEsb.elfilter = event.eventType eq 'GROUP_DELETE' || event.eventType eq 'GROUP_ADD' || event.eventType eq 'MEMBERSHIP_DELETE' || event.eventType eq 'MEMBERSHIP_ADD' #changeLog.consumer.messagingEsb.publisher.class = edu.internet2.middleware.grouper.changeLog.esb.consumer.EsbMessagingPublisher #changeLog.consumer.messagingEsb.publisher.messagingSystemName = grouperBuiltinMessaging # queue or topic #changeLog.consumer.messagingEsb.messageQueueType = queue #changeLog.consumer.messagingEsb.publisher.queueOrTopicqueueOrTopicName = abc |
Configure builtin messaging cleanup jobs in grouper-loader.properties
Code Block |
---|
################################## ## grouper builtin messaging cleanup cron ################################## #quartz cron-like schedule for grouper messaging daemon. #leave blank to disable this, the default is every hour, 10 minutes after the hour #this daemon does cleanup on the builtin messaging table changeLog.builtinMessagingDaemon.quartz.cron = 0 10 * * * ? # after three days of not consuming messages, delete them, if -1, dont run this daemon grouper.builtin.messaging.deleteAllMessagesMoreThanHoursOld = 72 # after three hours of having processed messages, delete them. Note, if this is -1 just delete when marking processed grouper.builtin.messaging.deleteProcessedMessagesMoreThanMinutesOld = 180 |
Test messaging
Run the test: edu.internet2.middleware.grouper.messaging.GrouperBuiltinMessagingSystemTest
...