`
dacoolbaby
  • 浏览: 1254918 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

开发自定义同步到MongoDB的OutputFormat

阅读更多

需求需要将Hadoop的数据插入到MongoDB。

 

数据类型是将字符串转换成一个类似Map的对象,插入到数据库中。以替换原有的单线程接口。

 

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;

public class MongoOutputFormat implements OutputFormat<Text, Text> {

    @Override
    public void checkOutputSpecs(FileSystem arg0, JobConf arg1) throws IOException {
        // TODO Auto-generated method stub
        System.out.println("OutputFormat CheckOutpuSpecs() function is not supported~!");
    }
	
	//实现OutputFormat接口的时候,返回一个RecordWriter对象。
	//这里可以实例化数据库连接JDBC对象,和RecordWriter共一个生命周期。
	//数据库连接串的相关对象,通过JobConf传入。
    @Override
    public RecordWriter<Text, Text> getRecordWriter(FileSystem arg0, JobConf conf, String arg2, Progressable arg3) throws IOException {
//        Configuration conf = jobconf.g.getConfiguration() ;
        String ip = conf.get("mongoIp");
        String port = conf.get("mongoPort");
        Mongo mongo = new Mongo(ip,Integer.parseInt(port));
        
        String username = conf.get("muser");
        String password = conf.get("mpwd");
        
        
        String dbname = conf.get("mongoDb");
        String collectionName = conf.get("mongoCollection");
        
        try {

            return new MongoDBRecordWriter(mongo,dbname,collectionName,username,password);
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }
    
    /**
     * A RecordWriter that writes the reduce output to a SQL table or MongoDB Collection!
     */
    public static class MongoDBRecordWriter implements RecordWriter<Text, Text> {

        private DBCollection coll;
        private Mongo mongo;
        
        public MongoDBRecordWriter() throws SQLException {

        }

        //使用这个构造函数
        public MongoDBRecordWriter(DBCollection coll) {
            this.coll = coll;
        }
        
        public MongoDBRecordWriter(Mongo mongo, String dbname, String collectionName, String username, String password) {
            this.mongo = mongo;
            DB d = this.mongo.getDB(dbname);
            d.authenticate(username, password.toCharArray());
            this.coll = d.getCollection(collectionName);
        }
        
        public DBCollection getCollection() {
            return coll;
        }

        // public PreparedStatement getStatement() {
        // return statement;
        // }

        @Override
        /** Close函数,用于关闭OutputFormat中用到的资源对象 */
        public void close(Reporter arg0) throws IOException {
            try {
                this.mongo.close();
            }
            catch (Exception e) {
                try {
                    System.out.println("Close() is not supported here...");
                }
                catch (Exception ex) {
					ex.printStackTrace();
                }
                throw new IOException(e);
            } finally {
                try {
                    System.out.println("Close() is not supported here...");
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }

		//RecordWriter中输出的方法,必须实现的。
        @Override
        public void write(Text key, Text value) throws IOException {
            try {
                String line = value.toString();
                String[] rs = line.split("\001");
                Map m = new HashMap();

                m.put("created_by", rs[7]);
                m.put("created_date", rs[8]);
                m.put("updated_by", rs[9]);
                m.put("updated_date", rs[10]);

                DBObject dbObj = new BasicDBObject();
                dbObj.putAll(m);
                coll.save(dbObj);
            }
            catch (Exception e) {
                // LoggingUtils.logAll(LOG, "Exception encountered", e);.
                System.err.print(e);
                e.printStackTrace();
            }
        }


    }
}

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics