Skip to main content

Redis pub/sub on Spring-Data


Center 에 MDM 으로 부터 Overseas 의 분산 서버로 master data 를 분산 배포하는 구조에 적용하려 함.


1. config

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:c="http://www.springframework.org/schema/c"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd">

<bean id="connectionFactory"
 class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
 <property name="hostName" value="localhost" />
 <property name="port" value="7379" />
 <property name="password" value="deweyhong" />
</bean>

<bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer" />

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" lazy-init="false">
<property name="connectionFactory" ref="connectionFactory" />
<property name="keySerializer" ref="stringRedisSerializer" />
<property name="hashKeySerializer" ref="stringRedisSerializer" />
<property name="valueSerializer" ref="stringRedisSerializer" />
</bean>

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="lhf.common.redis.service.support.RedisMessageListener"/>
    </constructor-arg>
</bean>
<bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="messageListeners">
      <!-- map of listeners and their associated topics (channels or/and patterns) -->
      <map>
        <entry key-ref="messageListener">
            <bean class="org.springframework.data.redis.listener.ChannelTopic">
               <constructor-arg value="USER"/>
            </bean>
        </entry>
      </map>
    </property>
 </bean>

</beans>

2. publisher (marshalling 적용)

    public void publishData(Object data){
        try{
            JAXBContext jaxbContext;
            if(data instanceof Data){
                jaxbContext = JAXBContext.newInstance(Data.class);
                Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
                jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
                jaxbMarshaller.marshal((Data)data, (OutputStream)output);
                redisTemplate.convertAndSend(((Data)data).getObjectKey(), data);
            }else if(data instanceof Lists){
                jaxbContext = JAXBContext.newInstance(Lists.class, Data.class);
                Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
                jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
                jaxbMarshaller.marshal((Lists)data, (OutputStream)output);
                redisTemplate.convertAndSend(((Lists<Data>)data).getValues().get(0).getObjectKey(), output.toString());
            }
        }catch(JAXBException e){
            e.printStackTrace();
        }
    }


3. 호출

            Lists<Data> userList = new Lists<Data>();
            userList.getValues().add(user1);
            userList.getValues().add(user2);
         
            // onMessage  호출
            service.publishData(userList);  


4. listener (unmarshalling 적용)

public class RedisMessageListener implements MessageListener {

    public void onMessage(final Message message, final byte[] pattern){
        try{
            String strData = message.toString();
            strData = "<?" + strData.substring(strData.indexOf("xml "), strData.length()); // 깨짐 방지
            System.out.println("Message received: " + strData);
            if(strData.indexOf("<lists>") == -1){
                JAXBContext jaxbContext = JAXBContext.newInstance(Data.class);
                Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
                Data data = (Data)jaxbUnmarshaller.unmarshal(createInputStream(strData, null));
            }else{
                JAXBContext jaxbContext = JAXBContext.newInstance(Lists.class, Data.class);
                Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
                Lists data = (Lists)jaxbUnmarshaller.unmarshal(createInputStream(strData, null));
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    private InputStream createInputStream(String s, String charset) throws java.io.UnsupportedEncodingException{
        if(charset != null) {
            return new ByteArrayInputStream(s.getBytes(charset));
        } else {
            return new ByteArrayInputStream(s.getBytes());
        }
    }
}

=> RedisMessageListener 으로 들어온 data 적절히 활용





Comments

Popular posts from this blog

Amazon RDS Blue/Green Deployments

In order to avoid some errors I experienced when proceeding as described in the official documentation, I describe what I did in order. 1) Modify parameters of source_database * error: Blue Green Deployments requires cluster parameter group has binlog enabled. RDS Parameter groups: source-params-group binlog_format => MIXED mysql> show global variables like 'binlog_format'; 2) Insert a row after rebooting the source database, to avoid this error. * error: Correct the replication errors and then switch over. Read Replica Replication Error - IOError: 1236, reason: Got fatal error 1236 from master when reading data from binary log: 'Could not find first log file name in binary log index file' => To Fix: You need to change the data in the source database. INSERT INTO dummy_table ( `favorite_id` , `favorite_order` , `user_id` , `board_id` ) VALUES ('100001', '1', '11111', '11111'); 3) Modify the param...

Fluentd for mysql in AWS

(0) preparation ulimit -n If your console shows 1024, it is insufficient. Please add following lines to your /etc/security/limits.conf file and reboot your machine. root soft nofile 65536 root hard nofile 65536 (1) install Fluentd // “Ubuntu 12.04 LTS / Precise” curl -L http://toolbelt.treasuredata.com/sh/install-ubuntu-precise.sh | sh /etc/init.d/td-agent start/stop/restart/status // test curl -X POST -d 'json={"json":"message"}' http://localhost:8888/debug.test /etc/init.d/td-agent stop chown: changing ownership of `/var/run/td-agent/td-agent.pid': Operation not permitted chown: changing ownership of `/var/run/td-agent': Operation not permitted  * Stopping td-agent td-agent                                                                                   ...

Install CoreOs on linode without VM

Install CoreOs on linode without VM 1. Add a Linode 2. Create a new Disk   CoreOS 3. Rescue > Reboot into Rescue Mode 4. Remote Access   Launch Lish Console 5. make an install script cat <<'EOF1' > install.sh # add needed package sudo apt-get update sudo apt-get install -y curl wget whois sudo apt-get install -y ca-certificates #sudo apt-get install gawk -y # get discovery url discoveryUrl=`curl https://discovery.etcd.io/new` # write cloud-config.yml cat <<EOF2 > cloud-config.yml #cloud-config users:   - name: core     groups:       - sudo       - docker coreos:   etcd:     name: node01     discovery: $discoveryUrl hostname: node01 EOF2 # get the coreos installation script #wget https://raw.github.com/coreos/init/master/bin/coreos-install wget https://raw.githubusercontent.com/coreos/init/master/bin/coreos-install # run installation chmod 75...