Monday, June 6, 2016

Streaming data from Result Set

This example shows a way to stream the database records in a JSON format. This post is done against postgres db and a table is assumed. However you can point to any table structure and db after doing required adjustments.


public class StreamingService {

  public void handleRequest(String sql, OutputStream op) throws IOException 
{
   //Initialize the driver
   try {
 Class.forName("org.postgresql.Driver");
   } catch (Exception e) {

   }


   //DB connection parameters
   String connUrl = "jdbc:postgresql://localhost/example";
   String userName = "<Give DB username>";
   String pass = "<Give DB password>";


   //Connect to DB
   try (Connection con = DriverManager.getConnection(connUrl, userName, pass); PreparedStatement ps = con.prepareStatement(sql);) 
{

  //Find the column names. This helps in building key for JSON
  try (ResultSet rs = ps.executeQuery();) 
    {
ResultSetMetaData rsMeta = rs.getMetaData();
        int columnCnt = rsMeta.getColumnCount();
        List<String> columnNames = new ArrayList<String>();
        for(int i=1;i<=columnCnt;i++) {
          columnNames.add(rsMeta.getColumnName(i).toUpperCase());
    }
   
      while (rs.next()) {
   JSONObject obj = new JSONObject();
   for(int i=1;i<=columnCnt;i++) {
      String key = columnNames.get(i - 1);
      String value = rs.getString(i);
      obj.put(key, value);
      op.write(obj.toString().getBytes());
      op.flush();
}
     }
    }
   } catch (SQLException e) {
       e.printStackTrace();
    }
  }
}

Main method to call the service and send an output stream where the service can write the data

public class StreamsFromDatabase

{
public static void main(String[] args) throws IOException, InterruptedException {

  PipedInputStream in = new PipedInputStream();
  PipedOutputStream out = new PipedOutputStream(in);

  String sql = "Select * from bollywood";

  StreamingService streamingService = new StreamingService();
    new Thread(new Runnable() {
public void run() {
 try {
   streamingService.handleRequest(sql, out);
 } catch (IOException e) {
    e.printStackTrace();
     }
   }
  }).start();

  while (true) {
    if (in.available() > 0) {
byte[] b = new byte[1000];
in.read(b);
System.out.println(new String(b));
Thread.sleep(1000);
     }
   } 
  }

}




No comments:

Post a Comment