Skip to main content

Redis pub/sub on Spring-Data


Center 에 MDM 으로 부터 Overseas 의 분산 서버로 master data 를 분산 배포하는 구조에 적용하려 함.


1. config

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:c="http://www.springframework.org/schema/c"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd">

<bean id="connectionFactory"
 class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
 <property name="hostName" value="localhost" />
 <property name="port" value="7379" />
 <property name="password" value="deweyhong" />
</bean>

<bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer" />

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" lazy-init="false">
<property name="connectionFactory" ref="connectionFactory" />
<property name="keySerializer" ref="stringRedisSerializer" />
<property name="hashKeySerializer" ref="stringRedisSerializer" />
<property name="valueSerializer" ref="stringRedisSerializer" />
</bean>

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="lhf.common.redis.service.support.RedisMessageListener"/>
    </constructor-arg>
</bean>
<bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="messageListeners">
      <!-- map of listeners and their associated topics (channels or/and patterns) -->
      <map>
        <entry key-ref="messageListener">
            <bean class="org.springframework.data.redis.listener.ChannelTopic">
               <constructor-arg value="USER"/>
            </bean>
        </entry>
      </map>
    </property>
 </bean>

</beans>

2. publisher (marshalling 적용)

    public void publishData(Object data){
        try{
            JAXBContext jaxbContext;
            if(data instanceof Data){
                jaxbContext = JAXBContext.newInstance(Data.class);
                Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
                jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
                jaxbMarshaller.marshal((Data)data, (OutputStream)output);
                redisTemplate.convertAndSend(((Data)data).getObjectKey(), data);
            }else if(data instanceof Lists){
                jaxbContext = JAXBContext.newInstance(Lists.class, Data.class);
                Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
                jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
                jaxbMarshaller.marshal((Lists)data, (OutputStream)output);
                redisTemplate.convertAndSend(((Lists<Data>)data).getValues().get(0).getObjectKey(), output.toString());
            }
        }catch(JAXBException e){
            e.printStackTrace();
        }
    }


3. 호출

            Lists<Data> userList = new Lists<Data>();
            userList.getValues().add(user1);
            userList.getValues().add(user2);
         
            // onMessage  호출
            service.publishData(userList);  


4. listener (unmarshalling 적용)

public class RedisMessageListener implements MessageListener {

    public void onMessage(final Message message, final byte[] pattern){
        try{
            String strData = message.toString();
            strData = "<?" + strData.substring(strData.indexOf("xml "), strData.length()); // 깨짐 방지
            System.out.println("Message received: " + strData);
            if(strData.indexOf("<lists>") == -1){
                JAXBContext jaxbContext = JAXBContext.newInstance(Data.class);
                Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
                Data data = (Data)jaxbUnmarshaller.unmarshal(createInputStream(strData, null));
            }else{
                JAXBContext jaxbContext = JAXBContext.newInstance(Lists.class, Data.class);
                Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
                Lists data = (Lists)jaxbUnmarshaller.unmarshal(createInputStream(strData, null));
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    private InputStream createInputStream(String s, String charset) throws java.io.UnsupportedEncodingException{
        if(charset != null) {
            return new ByteArrayInputStream(s.getBytes(charset));
        } else {
            return new ByteArrayInputStream(s.getBytes());
        }
    }
}

=> RedisMessageListener 으로 들어온 data 적절히 활용





Comments